tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [22/51] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)
Date Fri, 18 Apr 2014 10:31:34 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/026368be/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
new file mode 100644
index 0000000..63b50ac
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -0,0 +1,1125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.*;
+import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType;
+
+
+/**
+ * SubQuery plays a role in controlling an ExecutionBlock and is a finite state machine.
+ */
+public class SubQuery implements EventHandler<SubQueryEvent> {
+
+  private static final Log LOG = LogFactory.getLog(SubQuery.class);
+
+  private MasterPlan masterPlan;
+  private ExecutionBlock block;
+  private int priority;
+  private Schema schema;
+  private TableMeta meta;
+  private TableStats resultStatistics;
+  private TableStats inputStatistics;
+  private EventHandler<Event> eventHandler;
+  private final AbstractStorageManager sm;
+  private AbstractTaskScheduler taskScheduler;
+  private QueryMasterTask.QueryMasterTaskContext context;
+  private final List<String> diagnostics = new ArrayList<String>();
+
+  private long startTime;
+  private long finishTime;
+
+  volatile Map<QueryUnitId, QueryUnit> tasks = new ConcurrentHashMap<QueryUnitId, QueryUnit>();
+  volatile Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
+
+  private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+  private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+  private static final ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition();
+  private static final TaskCompletedTransition TASK_COMPLETED_TRANSITION = new TaskCompletedTransition();
+  private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION =
+      new AllocatedContainersCancelTransition();
+  private static final SubQueryCompleteTransition SUBQUERY_COMPLETED_TRANSITION =
+      new SubQueryCompleteTransition();
+  private StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent> stateMachine;
+
+  protected static final StateMachineFactory<SubQuery, SubQueryState,
+      SubQueryEventType, SubQueryEvent> stateMachineFactory =
+      new StateMachineFactory <SubQuery, SubQueryState,
+          SubQueryEventType, SubQueryEvent> (SubQueryState.NEW)
+
+          // Transitions from NEW state
+          .addTransition(SubQueryState.NEW,
+              EnumSet.of(SubQueryState.INITED, SubQueryState.ERROR, SubQueryState.SUCCEEDED),
+              SubQueryEventType.SQ_INIT,
+              new InitAndRequestContainer())
+          .addTransition(SubQueryState.NEW, SubQueryState.NEW,
+              SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(SubQueryState.NEW, SubQueryState.KILLED,
+              SubQueryEventType.SQ_KILL)
+          .addTransition(SubQueryState.NEW, SubQueryState.ERROR,
+              SubQueryEventType.SQ_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+          // Transitions from INITED state
+          .addTransition(SubQueryState.INITED, SubQueryState.RUNNING,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINER_LAUNCH_TRANSITION)
+          .addTransition(SubQueryState.INITED, SubQueryState.INITED,
+              SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(SubQueryState.INITED, SubQueryState.KILL_WAIT,
+              SubQueryEventType.SQ_KILL)
+          .addTransition(SubQueryState.INITED, SubQueryState.ERROR,
+              SubQueryEventType.SQ_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+          // Transitions from RUNNING state
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINER_LAUNCH_TRANSITION)
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+              SubQueryEventType.SQ_TASK_COMPLETED,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.RUNNING,
+              EnumSet.of(SubQueryState.SUCCEEDED, SubQueryState.FAILED),
+              SubQueryEventType.SQ_SUBQUERY_COMPLETED,
+              SUBQUERY_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+              SubQueryEventType.SQ_FAILED,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+              SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(SubQueryState.RUNNING, SubQueryState.KILL_WAIT,
+              SubQueryEventType.SQ_KILL,
+              new KillTasksTransition())
+          .addTransition(SubQueryState.RUNNING, SubQueryState.ERROR,
+              SubQueryEventType.SQ_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able Transition
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+              SubQueryEventType.SQ_START)
+
+          // Transitions from KILL_WAIT state
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+              EnumSet.of(SubQueryEventType.SQ_KILL))
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+              SubQueryEventType.SQ_TASK_COMPLETED,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.KILL_WAIT,
+              EnumSet.of(SubQueryState.SUCCEEDED, SubQueryState.FAILED, SubQueryState.KILLED),
+              SubQueryEventType.SQ_SUBQUERY_COMPLETED,
+              SUBQUERY_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+              SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+              SubQueryEventType.SQ_FAILED,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.ERROR,
+              SubQueryEventType.SQ_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+              // Transitions from SUCCEEDED state
+          .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
+          .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
+              SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(SubQueryState.SUCCEEDED, SubQueryState.ERROR,
+              SubQueryEventType.SQ_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+              // Ignore-able events
+          .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
+              EnumSet.of(
+                  SubQueryEventType.SQ_START,
+                  SubQueryEventType.SQ_KILL,
+                  SubQueryEventType.SQ_CONTAINER_ALLOCATED))
+
+          // Transitions from FAILED state
+          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
+          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+              SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(SubQueryState.FAILED, SubQueryState.ERROR,
+              SubQueryEventType.SQ_INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+              EnumSet.of(
+                  SubQueryEventType.SQ_START,
+                  SubQueryEventType.SQ_KILL,
+                  SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+                  SubQueryEventType.SQ_FAILED))
+
+          // Transitions from FAILED state
+          .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
+          .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
+              SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
+              EnumSet.of(
+                  SubQueryEventType.SQ_START,
+                  SubQueryEventType.SQ_KILL,
+                  SubQueryEventType.SQ_FAILED,
+                  SubQueryEventType.SQ_INTERNAL_ERROR))
+
+          .installTopology();
+
+  private final Lock readLock;
+  private final Lock writeLock;
+
+  private int totalScheduledObjectsCount;
+  private int succeededObjectCount = 0;
+  private int completedTaskCount = 0;
+  private int succeededTaskCount = 0;
+  private int killedObjectCount = 0;
+  private int failedObjectCount = 0;
+  private TaskSchedulerContext schedulerContext;
+
+  public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block, AbstractStorageManager sm) {
+    this.context = context;
+    this.masterPlan = masterPlan;
+    this.block = block;
+    this.sm = sm;
+    this.eventHandler = context.getEventHandler();
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+    stateMachine = stateMachineFactory.make(this);
+  }
+
+  public static boolean isRunningState(SubQueryState state) {
+    return state == SubQueryState.INITED || state == SubQueryState.NEW || state == SubQueryState.RUNNING;
+  }
+
+  public QueryMasterTask.QueryMasterTaskContext getContext() {
+    return context;
+  }
+
+  public MasterPlan getMasterPlan() {
+    return masterPlan;
+  }
+
+  public DataChannel getDataChannel() {
+    return masterPlan.getOutgoingChannels(getId()).iterator().next();
+  }
+
+  public EventHandler<Event> getEventHandler() {
+    return eventHandler;
+  }
+
+  public AbstractTaskScheduler getTaskScheduler() {
+    return taskScheduler;
+  }
+
+  public void setStartTime() {
+    startTime = context.getClock().getTime();
+  }
+
+  @SuppressWarnings("UnusedDeclaration")
+  public long getStartTime() {
+    return this.startTime;
+  }
+
+  public void setFinishTime() {
+    finishTime = context.getClock().getTime();
+  }
+
+  @SuppressWarnings("UnusedDeclaration")
+  public long getFinishTime() {
+    return this.finishTime;
+  }
+
+  public float getTaskProgress() {
+    readLock.lock();
+    try {
+      if (getState() == SubQueryState.NEW) {
+        return 0;
+      } else {
+        return (float)(succeededObjectCount) / (float)totalScheduledObjectsCount;
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public float getProgress() {
+    List<QueryUnit> tempTasks = null;
+    readLock.lock();
+    try {
+      if (getState() == SubQueryState.NEW) {
+        return 0;
+      } else {
+        tempTasks = new ArrayList<QueryUnit>(tasks.values());
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+    float totalProgress = 0.0f;
+    for (QueryUnit eachQueryUnit: tempTasks) {
+      if (eachQueryUnit.getLastAttempt() != null) {
+        totalProgress += eachQueryUnit.getLastAttempt().getProgress();
+      }
+    }
+
+    return totalProgress/(float)tempTasks.size();
+  }
+
+  public int getSucceededObjectCount() {
+    return succeededObjectCount;
+  }
+
+  public int getTotalScheduledObjectsCount() {
+    return totalScheduledObjectsCount;
+  }
+
+  public ExecutionBlock getBlock() {
+    return block;
+  }
+
+  public void addTask(QueryUnit task) {
+    tasks.put(task.getId(), task);
+  }
+
+  /**
+   * It finalizes this subquery. It is only invoked when the subquery is succeeded.
+   */
+  public void complete() {
+    cleanup();
+    finalizeStats();
+    setFinishTime();
+    eventHandler.handle(new SubQueryCompletedEvent(getId(), SubQueryState.SUCCEEDED));
+  }
+
+  /**
+   * It finalizes this subquery. Unlike {@link SubQuery#complete()},
+   * it is invoked when a subquery is abnormally finished.
+   *
+   * @param finalState The final subquery state
+   */
+  public void abort(SubQueryState finalState) {
+    // TODO -
+    // - committer.abortSubQuery(...)
+    // - record SubQuery Finish Time
+    // - CleanUp Tasks
+    // - Record History
+    cleanup();
+    setFinishTime();
+    eventHandler.handle(new SubQueryCompletedEvent(getId(), finalState));
+  }
+
+  public StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent> getStateMachine() {
+    return this.stateMachine;
+  }
+
+  public void setPriority(int priority) {
+    this.priority = priority;
+  }
+
+
+  public int getPriority() {
+    return this.priority;
+  }
+
+  public AbstractStorageManager getStorageManager() {
+    return sm;
+  }
+  
+  public ExecutionBlockId getId() {
+    return block.getId();
+  }
+  
+  public QueryUnit[] getQueryUnits() {
+    return tasks.values().toArray(new QueryUnit[tasks.size()]);
+  }
+  
+  public QueryUnit getQueryUnit(QueryUnitId qid) {
+    return tasks.get(qid);
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public TableMeta getTableMeta() {
+    return meta;
+  }
+
+  public TableStats getResultStats() {
+    return resultStatistics;
+  }
+
+  public TableStats getInputStats() {
+    return inputStatistics;
+  }
+
+  public List<String> getDiagnostics() {
+    readLock.lock();
+    try {
+      return diagnostics;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  protected void addDiagnostic(String diag) {
+    diagnostics.add(diag);
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(this.getId());
+    return sb.toString();
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof SubQuery) {
+      SubQuery other = (SubQuery)o;
+      return getId().equals(other.getId());
+    }
+    return false;
+  }
+  
+  @Override
+  public int hashCode() {
+    return getId().hashCode();
+  }
+  
+  public int compareTo(SubQuery other) {
+    return getId().compareTo(other.getId());
+  }
+
+  public SubQueryState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public static TableStats[] computeStatFromUnionBlock(SubQuery subQuery) {
+    TableStats[] stat = new TableStats[]{new TableStats(), new TableStats()};
+    long[] avgRows = new long[]{0, 0};
+    long[] numBytes = new long[]{0, 0};
+    long[] readBytes = new long[]{0, 0};
+    long[] numRows = new long[]{0, 0};
+    int[] numBlocks = new int[]{0, 0};
+    int[] numOutputs = new int[]{0, 0};
+
+    List<ColumnStats> columnStatses = Lists.newArrayList();
+
+    MasterPlan masterPlan = subQuery.getMasterPlan();
+    Iterator<ExecutionBlock> it = masterPlan.getChilds(subQuery.getBlock()).iterator();
+    while (it.hasNext()) {
+      ExecutionBlock block = it.next();
+      SubQuery childSubQuery = subQuery.context.getSubQuery(block.getId());
+      TableStats[] childStatArray = new TableStats[]{
+          childSubQuery.getInputStats(), childSubQuery.getResultStats()
+      };
+      for (int i = 0; i < 2; i++) {
+        if (childStatArray[i] == null) {
+          continue;
+        }
+        avgRows[i] += childStatArray[i].getAvgRows();
+        numBlocks[i] += childStatArray[i].getNumBlocks();
+        numBytes[i] += childStatArray[i].getNumBytes();
+        readBytes[i] += childStatArray[i].getReadBytes();
+        numOutputs[i] += childStatArray[i].getNumShuffleOutputs();
+        numRows[i] += childStatArray[i].getNumRows();
+      }
+      columnStatses.addAll(childStatArray[1].getColumnStats());
+    }
+
+    for (int i = 0; i < 2; i++) {
+      stat[i].setNumBlocks(numBlocks[i]);
+      stat[i].setNumBytes(numBytes[i]);
+      stat[i].setReadBytes(readBytes[i]);
+      stat[i].setNumShuffleOutputs(numOutputs[i]);
+      stat[i].setNumRows(numRows[i]);
+      stat[i].setAvgRows(avgRows[i]);
+    }
+    stat[1].setColumnStats(columnStatses);
+
+    return stat;
+  }
+
+  private TableStats[] computeStatFromTasks() {
+    List<TableStats> inputStatsList = Lists.newArrayList();
+    List<TableStats> resultStatsList = Lists.newArrayList();
+    for (QueryUnit unit : getQueryUnits()) {
+      resultStatsList.add(unit.getStats());
+      if (unit.getLastAttempt().getInputStats() != null) {
+        inputStatsList.add(unit.getLastAttempt().getInputStats());
+      }
+    }
+    TableStats inputStats = StatisticsUtil.aggregateTableStat(inputStatsList);
+    TableStats resultStats = StatisticsUtil.aggregateTableStat(resultStatsList);
+    return new TableStats[]{inputStats, resultStats};
+  }
+
+  private void stopScheduler() {
+    // If there are launched TaskRunners, send the 'shouldDie' message to all r
+    // via received task requests.
+    if (taskScheduler != null) {
+      taskScheduler.stop();
+    }
+  }
+
+  private void releaseContainers() {
+    // If there are still live TaskRunners, try to kill the containers.
+    eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values()));
+  }
+
+  public void releaseContainer(ContainerId containerId) {
+    // try to kill the container.
+    ArrayList<Container> list = new ArrayList<Container>();
+    list.add(containers.get(containerId));
+    eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), list));
+  }
+
+  /**
+   * It computes all stats and sets the intermediate result.
+   */
+  private void finalizeStats() {
+    TableStats[] statsArray;
+    if (block.hasUnion()) {
+      statsArray = computeStatFromUnionBlock(this);
+    } else {
+      statsArray = computeStatFromTasks();
+    }
+
+    DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0);
+    // get default or store type
+    CatalogProtos.StoreType storeType = CatalogProtos.StoreType.CSV; // default setting
+
+    // if store plan (i.e., CREATE or INSERT OVERWRITE)
+    StoreTableNode storeTableNode = PlannerUtil.findTopNode(getBlock().getPlan(), NodeType.STORE);
+    if (storeTableNode != null) {
+      storeType = storeTableNode.getStorageType();
+    }
+    schema = channel.getSchema();
+    meta = CatalogUtil.newTableMeta(storeType, new Options());
+    inputStatistics = statsArray[0];
+    resultStatistics = statsArray[1];
+  }
+
+  @Override
+  public void handle(SubQueryEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getSubQueryId() + " of type " + event.getType() + ", preState=" + getState());
+    }
+
+    try {
+      writeLock.lock();
+      SubQueryState oldState = getState();
+      try {
+        getStateMachine().doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        eventHandler.handle(new SubQueryEvent(getId(),
+            SubQueryEventType.SQ_INTERNAL_ERROR));
+      }
+
+      // notify the eventhandler of state change
+      if (LOG.isDebugEnabled()) {
+        if (oldState != getState()) {
+          LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to "
+              + getState());
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void handleTaskRequestEvent(TaskRequestEvent event) {
+    taskScheduler.handleTaskRequestEvent(event);
+  }
+
+  private static class InitAndRequestContainer implements MultipleArcTransition<SubQuery,
+      SubQueryEvent, SubQueryState> {
+
+    @Override
+    public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
+      subQuery.setStartTime();
+      ExecutionBlock execBlock = subQuery.getBlock();
+      SubQueryState state;
+
+      try {
+        // Union operator does not require actual query processing. It is performed logically.
+        if (execBlock.hasUnion()) {
+          subQuery.finalizeStats();
+          state = SubQueryState.SUCCEEDED;
+        } else {
+          ExecutionBlock parent = subQuery.getMasterPlan().getParent(subQuery.getBlock());
+          DataChannel channel = subQuery.getMasterPlan().getChannel(subQuery.getId(), parent.getId());
+          setShuffleIfNecessary(subQuery, channel);
+          initTaskScheduler(subQuery);
+          schedule(subQuery);
+          subQuery.totalScheduledObjectsCount = subQuery.getTaskScheduler().remainingScheduledObjectNum();
+          LOG.info(subQuery.totalScheduledObjectsCount + " objects are scheduled");
+
+          if (subQuery.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
+            subQuery.stopScheduler();
+            subQuery.finalizeStats();
+            subQuery.eventHandler.handle(new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.SUCCEEDED));
+            return SubQueryState.SUCCEEDED;
+          } else {
+            subQuery.taskScheduler.start();
+            allocateContainers(subQuery);
+            return SubQueryState.INITED;
+          }
+        }
+      } catch (Exception e) {
+        LOG.error("SubQuery (" + subQuery.getId() + ") ERROR: ", e);
+        subQuery.setFinishTime();
+        subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(), e.getMessage()));
+        subQuery.eventHandler.handle(new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.ERROR));
+        return SubQueryState.ERROR;
+      }
+
+      return state;
+    }
+
+    private void initTaskScheduler(SubQuery subQuery) throws IOException {
+      TajoConf conf = subQuery.context.getConf();
+      subQuery.schedulerContext = new TaskSchedulerContext(subQuery.context,
+          subQuery.getMasterPlan().isLeaf(subQuery.getId()), subQuery.getId());
+      subQuery.taskScheduler = TaskSchedulerFactory.get(conf, subQuery.schedulerContext, subQuery);
+      subQuery.taskScheduler.init(conf);
+      LOG.info(subQuery.taskScheduler.getName() + " is chosen for the task scheduling for " + subQuery.getId());
+    }
+
+    /**
+     * If a parent block requires a repartition operation, the method sets proper repartition
+     * methods and the number of partitions to a given subquery.
+     */
+    private static void setShuffleIfNecessary(SubQuery subQuery, DataChannel channel) {
+      if (channel.getShuffleType() != ShuffleType.NONE_SHUFFLE) {
+        int numTasks = calculateShuffleOutputNum(subQuery, channel);
+        Repartitioner.setShuffleOutputNumForTwoPhase(subQuery, numTasks, channel);
+      }
+    }
+
+    /**
+     * Getting the total memory of cluster
+     *
+     * @param subQuery
+     * @return mega bytes
+     */
+    private static int getClusterTotalMemory(SubQuery subQuery) {
+      List<TajoMasterProtocol.WorkerResourceProto> workers =
+          subQuery.context.getQueryMasterContext().getQueryMaster().getAllWorker();
+
+      int totalMem = 0;
+      for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+        totalMem += worker.getMemoryMB();
+      }
+      return totalMem;
+    }
+    /**
+     * Getting the desire number of partitions according to the volume of input data.
+     * This method is only used to determine the partition key number of hash join or aggregation.
+     *
+     * @param subQuery
+     * @return
+     */
+    public static int calculateShuffleOutputNum(SubQuery subQuery, DataChannel channel) {
+      TajoConf conf = subQuery.context.getConf();
+      MasterPlan masterPlan = subQuery.getMasterPlan();
+      ExecutionBlock parent = masterPlan.getParent(subQuery.getBlock());
+
+      GroupbyNode grpNode = null;
+      if (parent != null) {
+        grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY);
+      }
+
+      // Is this subquery the first step of join?
+      if (parent != null && parent.getScanNodes().length == 2) {
+        List<ExecutionBlock> childs = masterPlan.getChilds(parent);
+
+        // for outer
+        ExecutionBlock outer = childs.get(0);
+        long outerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, outer);
+
+        // for inner
+        ExecutionBlock inner = childs.get(1);
+        long innerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, inner);
+        LOG.info(subQuery.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, "
+            + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB");
+
+        long bigger = Math.max(outerVolume, innerVolume);
+
+        int mb = (int) Math.ceil((double) bigger / 1048576);
+        LOG.info(subQuery.getId() + ", Bigger Table's volume is approximately " + mb + " MB");
+
+        int taskNum = (int) Math.ceil((double) mb /
+            conf.getIntVar(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME));
+
+        int totalMem = getClusterTotalMemory(subQuery);
+        LOG.info(subQuery.getId() + ", Total memory of cluster is " + totalMem + " MB");
+        int slots = Math.max(totalMem / conf.getIntVar(ConfVars.TASK_DEFAULT_MEMORY), 1);
+
+        // determine the number of task
+        taskNum = Math.min(taskNum, slots);
+        LOG.info(subQuery.getId() + ", The determined number of join partitions is " + taskNum);
+
+        return taskNum;
+
+        // Is this subquery the first step of group-by?
+      } else if (grpNode != null) {
+
+        if (grpNode.getGroupingColumns().length == 0) {
+          return 1;
+        } else {
+          long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
+
+          int mb = (int) Math.ceil((double) volume / 1048576);
+          LOG.info(subQuery.getId() + ", Table's volume is approximately " + mb + " MB");
+          // determine the number of task
+          int taskNumBySize = (int) Math.ceil((double) mb /
+              conf.getIntVar(ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME));
+
+          int totalMem = getClusterTotalMemory(subQuery);
+
+          LOG.info(subQuery.getId() + ", Total memory of cluster is " + totalMem + " MB");
+          int slots = Math.max(totalMem / conf.getIntVar(ConfVars.TASK_DEFAULT_MEMORY), 1);
+          int taskNum = Math.min(taskNumBySize, slots); //Maximum partitions
+          LOG.info(subQuery.getId() + ", The determined number of aggregation partitions is " + taskNum);
+          return taskNum;
+        }
+      } else {
+        LOG.info("============>>>>> Unexpected Case! <<<<<================");
+        long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
+
+        int mb = (int) Math.ceil((double)volume / 1048576);
+        LOG.info(subQuery.getId() + ", Table's volume is approximately " + mb + " MB");
+        // determine the number of task per 128MB
+        int taskNum = (int) Math.ceil((double)mb / 128);
+        LOG.info(subQuery.getId() + ", The determined number of partitions is " + taskNum);
+        return taskNum;
+      }
+    }
+
+    private static void schedule(SubQuery subQuery) throws IOException {
+      MasterPlan masterPlan = subQuery.getMasterPlan();
+      ExecutionBlock execBlock = subQuery.getBlock();
+      if (subQuery.getMasterPlan().isLeaf(execBlock.getId()) && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan
+        scheduleFragmentsForLeafQuery(subQuery);
+      } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join
+        Repartitioner.scheduleFragmentsForJoinQuery(subQuery.schedulerContext, subQuery);
+      } else { // Case 3: Others (Sort or Aggregation)
+        int numTasks = getNonLeafTaskNum(subQuery);
+        Repartitioner.scheduleFragmentsForNonLeafTasks(subQuery.schedulerContext, masterPlan, subQuery, numTasks);
+      }
+    }
+
+    /**
+     * Getting the desire number of tasks according to the volume of input data
+     *
+     * @param subQuery
+     * @return
+     */
+    public static int getNonLeafTaskNum(SubQuery subQuery) {
+      // Getting intermediate data size
+      long volume = getInputVolume(subQuery.getMasterPlan(), subQuery.context, subQuery.getBlock());
+
+      int mb = (int) Math.ceil((double)volume / 1048576);
+      LOG.info("Table's volume is approximately " + mb + " MB");
+      // determine the number of task per 64MB
+      int maxTaskNum = Math.max(1, (int) Math.ceil((double)mb / 64));
+      LOG.info("The determined number of non-leaf tasks is " + maxTaskNum);
+      return maxTaskNum;
+    }
+
+    public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMasterTaskContext context,
+                                      ExecutionBlock execBlock) {
+      Map<String, TableDesc> tableMap = context.getTableDescMap();
+      if (masterPlan.isLeaf(execBlock)) {
+        ScanNode[] outerScans = execBlock.getScanNodes();
+        long maxVolume = 0;
+        for (ScanNode eachScanNode: outerScans) {
+          TableStats stat = tableMap.get(eachScanNode.getCanonicalName()).getStats();
+          if (stat.getNumBytes() > maxVolume) {
+            maxVolume = stat.getNumBytes();
+          }
+        }
+        return maxVolume;
+      } else {
+        long aggregatedVolume = 0;
+        for (ExecutionBlock childBlock : masterPlan.getChilds(execBlock)) {
+          SubQuery subquery = context.getSubQuery(childBlock.getId());
+          if (subquery == null || subquery.getState() != SubQueryState.SUCCEEDED) {
+            aggregatedVolume += getInputVolume(masterPlan, context, childBlock);
+          } else {
+            aggregatedVolume += subquery.getResultStats().getNumBytes();
+          }
+        }
+
+        return aggregatedVolume;
+      }
+    }
+
+    public static void allocateContainers(SubQuery subQuery) {
+      ExecutionBlock execBlock = subQuery.getBlock();
+
+      //TODO consider disk slot
+      int requiredMemoryMBPerTask = 512;
+
+      int numRequest = subQuery.getContext().getResourceAllocator().calculateNumRequestContainers(
+          subQuery.getContext().getQueryMasterContext().getWorkerContext(),
+          subQuery.schedulerContext.getEstimatedTaskNum(),
+          requiredMemoryMBPerTask
+      );
+
+      final Resource resource = Records.newRecord(Resource.class);
+
+      resource.setMemory(requiredMemoryMBPerTask);
+
+      LOG.info("Request Container for " + subQuery.getId() + " containers=" + numRequest);
+
+      Priority priority = Records.newRecord(Priority.class);
+      priority.setPriority(subQuery.getPriority());
+      ContainerAllocationEvent event =
+          new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
+              subQuery.getId(), priority, resource, numRequest,
+              subQuery.masterPlan.isLeaf(execBlock), 0.0f);
+      subQuery.eventHandler.handle(event);
+    }
+
+    private static void scheduleFragmentsForLeafQuery(SubQuery subQuery) throws IOException {
+      ExecutionBlock execBlock = subQuery.getBlock();
+      ScanNode[] scans = execBlock.getScanNodes();
+      Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
+      ScanNode scan = scans[0];
+      TableDesc table = subQuery.context.getTableDescMap().get(scan.getCanonicalName());
+
+      Collection<FileFragment> fragments;
+      TableMeta meta = table.getMeta();
+
+      // Depending on scanner node's type, it creates fragments. If scan is for
+      // a partitioned table, It will creates lots fragments for all partitions.
+      // Otherwise, it creates at least one fragments for a table, which may
+      // span a number of blocks or possibly consists of a number of files.
+      if (scan.getType() == NodeType.PARTITIONS_SCAN) {
+        fragments = Repartitioner.getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, table);
+      } else {
+        Path inputPath = table.getPath();
+        fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, table.getSchema(), inputPath);
+      }
+
+      SubQuery.scheduleFragments(subQuery, fragments);
+      if (subQuery.getTaskScheduler() instanceof DefaultTaskScheduler) {
+        //Leaf task of DefaultTaskScheduler should be fragment size
+        // EstimatedTaskNum determined number of initial container
+        subQuery.schedulerContext.setTaskSize(fragments.size());
+        subQuery.schedulerContext.setEstimatedTaskNum(fragments.size());
+      } else {
+        TajoConf conf = subQuery.context.getConf();
+        subQuery.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024);
+        int estimatedTaskNum = (int) Math.ceil((double) table.getStats().getNumBytes() /
+            (double) subQuery.schedulerContext.getTaskSize());
+        subQuery.schedulerContext.setEstimatedTaskNum(estimatedTaskNum);
+      }
+    }
+  }
+
+  public static void scheduleFragment(SubQuery subQuery, FileFragment fragment) {
+    subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+        subQuery.getId(), fragment));
+  }
+
+
+  public static void scheduleFragments(SubQuery subQuery, Collection<FileFragment> fragments) {
+    for (FileFragment eachFragment : fragments) {
+      scheduleFragment(subQuery, eachFragment);
+    }
+  }
+
+  public static void scheduleFragments(SubQuery subQuery, Collection<FileFragment> leftFragments,
+                                       Collection<FileFragment> broadcastFragments) {
+    for (FileFragment eachLeafFragment : leftFragments) {
+      scheduleFragment(subQuery, eachLeafFragment, broadcastFragments);
+    }
+  }
+
+  public static void scheduleFragment(SubQuery subQuery,
+                                      FileFragment leftFragment, Collection<FileFragment> rightFragments) {
+    subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+        subQuery.getId(), leftFragment, rightFragments));
+  }
+
+  public static void scheduleFetches(SubQuery subQuery, Map<String, List<URI>> fetches) {
+    subQuery.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+        subQuery.getId(), fetches));
+  }
+
+  public static QueryUnit newEmptyQueryUnit(TaskSchedulerContext schedulerContext,
+                                            QueryUnitAttemptScheduleContext queryUnitContext,
+                                            SubQuery subQuery, int taskId) {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    QueryUnit unit = new QueryUnit(schedulerContext.getMasterContext().getConf(),
+        queryUnitContext,
+        QueryIdFactory.newQueryUnitId(schedulerContext.getBlockId(), taskId),
+        schedulerContext.isLeafQuery(), subQuery.eventHandler);
+    unit.setLogicalPlan(execBlock.getPlan());
+    subQuery.addTask(unit);
+    return unit;
+  }
+
+  private static class ContainerLaunchTransition
+      implements SingleArcTransition<SubQuery, SubQueryEvent> {
+
+    @Override
+    public void transition(SubQuery subQuery, SubQueryEvent event) {
+      try {
+        SubQueryContainerAllocationEvent allocationEvent =
+            (SubQueryContainerAllocationEvent) event;
+        for (Container container : allocationEvent.getAllocatedContainer()) {
+          ContainerId cId = container.getId();
+          if (subQuery.containers.containsKey(cId)) {
+            subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
+                "Duplicated containers are allocated: " + cId.toString()));
+            subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
+          }
+          subQuery.containers.put(cId, container);
+        }
+        LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.containers.size() + " containers!");
+        subQuery.eventHandler.handle(
+            new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH,
+                subQuery.getId(), allocationEvent.getAllocatedContainer()));
+
+        subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_START));
+      } catch (Throwable t) {
+        subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
+            ExceptionUtils.getStackTrace(t)));
+        subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
+      }
+    }
+  }
+
+  /**
+   * It is used in KILL_WAIT state against Contained Allocated event.
+   * It just returns allocated containers to resource manager.
+   */
+  private static class AllocatedContainersCancelTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
+    @Override
+    public void transition(SubQuery subQuery, SubQueryEvent event) {
+      try {
+        SubQueryContainerAllocationEvent allocationEvent =
+            (SubQueryContainerAllocationEvent) event;
+        subQuery.eventHandler.handle(
+            new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP,
+                subQuery.getId(), allocationEvent.getAllocatedContainer()));
+        LOG.info(String.format("[%s] %d allocated containers are canceled",
+            subQuery.getId().toString(),
+            allocationEvent.getAllocatedContainer().size()));
+      } catch (Throwable t) {
+        subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
+            ExceptionUtils.getStackTrace(t)));
+        subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
+      }
+    }
+  }
+
+  private static class TaskCompletedTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
+
+    @Override
+    public void transition(SubQuery subQuery,
+                           SubQueryEvent event) {
+      SubQueryTaskEvent taskEvent = (SubQueryTaskEvent) event;
+      QueryUnit task = subQuery.getQueryUnit(taskEvent.getTaskId());
+
+      if (task == null) { // task failed
+        LOG.error(String.format("Task %s is absent", taskEvent.getTaskId()));
+        subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_FAILED));
+      } else {
+        subQuery.completedTaskCount++;
+
+        if (taskEvent.getState() == TaskState.SUCCEEDED) {
+//          if (task.isLeafTask()) {
+//            subQuery.succeededObjectCount += task.getTotalFragmentNum();
+//          } else {
+//            subQuery.succeededObjectCount++;
+//          }
+          subQuery.succeededObjectCount++;
+        } else if (task.getState() == TaskState.KILLED) {
+//          if (task.isLeafTask()) {
+//            subQuery.killedObjectCount += task.getTotalFragmentNum();
+//          } else {
+//            subQuery.killedObjectCount++;
+//          }
+          subQuery.killedObjectCount++;
+        } else if (task.getState() == TaskState.FAILED) {
+//          if (task.isLeafTask()) {
+//            subQuery.failedObjectCount+= task.getTotalFragmentNum();
+//          } else {
+//            subQuery.failedObjectCount++;
+//          }
+          subQuery.failedObjectCount++;
+          // if at least one task is failed, try to kill all tasks.
+          subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_KILL));
+        }
+
+        LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d",
+            subQuery.getId(),
+            subQuery.getTotalScheduledObjectsCount(),
+            subQuery.succeededObjectCount,
+            subQuery.killedObjectCount,
+            subQuery.failedObjectCount));
+
+        if (subQuery.totalScheduledObjectsCount ==
+            subQuery.succeededObjectCount + subQuery.killedObjectCount + subQuery.failedObjectCount) {
+          subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_SUBQUERY_COMPLETED));
+        }
+      }
+    }
+  }
+
+  private static class KillTasksTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
+
+    @Override
+    public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
+      subQuery.getTaskScheduler().stop();
+      for (QueryUnit queryUnit : subQuery.getQueryUnits()) {
+        subQuery.eventHandler.handle(new TaskEvent(queryUnit.getId(), TaskEventType.T_KILL));
+      }
+    }
+  }
+
+  private void cleanup() {
+    stopScheduler();
+    releaseContainers();
+  }
+
+  private static class SubQueryCompleteTransition
+      implements MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
+
+    @Override
+    public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
+      // TODO - Commit subQuery & do cleanup
+      // TODO - records succeeded, failed, killed completed task
+      // TODO - records metrics
+      try {
+        LOG.info(String.format("subQuery completed - %s (total=%d, success=%d, killed=%d)",
+            subQuery.getId().toString(),
+            subQuery.getTotalScheduledObjectsCount(),
+            subQuery.getSucceededObjectCount(),
+            subQuery.killedObjectCount));
+
+        if (subQuery.killedObjectCount > 0 || subQuery.failedObjectCount > 0) {
+          if (subQuery.failedObjectCount > 0) {
+            subQuery.abort(SubQueryState.FAILED);
+            return SubQueryState.FAILED;
+          } else if (subQuery.killedObjectCount > 0) {
+            subQuery.abort(SubQueryState.KILLED);
+            return SubQueryState.KILLED;
+          } else {
+            LOG.error("Invalid State " + subQuery.getState() + " State");
+            subQuery.abort(SubQueryState.ERROR);
+            return SubQueryState.ERROR;
+          }
+        } else {
+          subQuery.complete();
+          return SubQueryState.SUCCEEDED;
+        }
+      } catch (Throwable t) {
+        LOG.error(t);
+        subQuery.abort(SubQueryState.ERROR);
+        return SubQueryState.ERROR;
+      }
+    }
+  }
+
+  private static class DiagnosticsUpdateTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
+    @Override
+    public void transition(SubQuery subQuery, SubQueryEvent event) {
+      subQuery.addDiagnostic(((SubQueryDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
+    }
+  }
+
+  private static class InternalErrorTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
+    @Override
+    public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
+      subQuery.abort(SubQueryState.ERROR);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/026368be/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
new file mode 100644
index 0000000..effcfde
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+public enum SubQueryState {
+  NEW,
+  INITED,
+  RUNNING,
+  SUCCEEDED,
+  FAILED,
+  KILL_WAIT,
+  KILLED,
+  ERROR
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/026368be/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
new file mode 100644
index 0000000..5ac6e39
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.hadoop.yarn.api.records.*;
+
+public class TajoWorkerContainer extends Container {
+  ContainerId id;
+  NodeId nodeId;
+  WorkerResource workerResource;
+
+  public WorkerResource getWorkerResource() {
+    return workerResource;
+  }
+
+  public void setWorkerResource(WorkerResource workerResource) {
+    this.workerResource = workerResource;
+  }
+
+  @Override
+  public ContainerId getId() {
+    return id;
+  }
+
+  @Override
+  public void setId(ContainerId id) {
+    this.id = id;
+  }
+
+  @Override
+  public NodeId getNodeId() {
+    return nodeId;
+  }
+
+  @Override
+  public void setNodeId(NodeId nodeId) {
+    this.nodeId = nodeId;
+  }
+
+  @Override
+  public String getNodeHttpAddress() {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public void setNodeHttpAddress(String nodeHttpAddress) {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public Resource getResource() {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public void setResource(Resource resource) {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public Priority getPriority() {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public void setPriority(Priority priority) {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public Token getContainerToken() {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public void setContainerToken(Token containerToken) {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public int compareTo(Container container) {
+    return 0;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/026368be/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
new file mode 100644
index 0000000..634ad2b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+
+public class TajoWorkerContainerId extends ContainerId {
+  ApplicationAttemptId applicationAttemptId;
+  int id;
+
+  @Override
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  @Override
+  public void setApplicationAttemptId(ApplicationAttemptId atId) {
+    this.applicationAttemptId = atId;
+  }
+
+  @Override
+  public int getId() {
+    return id;
+  }
+
+  @Override
+  public void setId(int id) {
+    this.id = id;
+  }
+
+  public YarnProtos.ContainerIdProto getProto() {
+    YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder()
+        .setClusterTimestamp(applicationAttemptId.getApplicationId().getClusterTimestamp())
+        .setId(applicationAttemptId.getApplicationId().getId())
+        .build();
+
+    YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder()
+        .setAttemptId(applicationAttemptId.getAttemptId())
+        .setApplicationId(appIdProto)
+        .build();
+
+    return YarnProtos.ContainerIdProto.newBuilder()
+        .setAppAttemptId(attemptIdProto)
+        .setAppId(appIdProto)
+        .setId(id)
+        .build();
+  }
+
+  public static YarnProtos.ContainerIdProto getContainerIdProto(ContainerId containerId) {
+    if(containerId instanceof TajoWorkerContainerId) {
+      return ((TajoWorkerContainerId)containerId).getProto();
+    } else {
+      YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder()
+          .setClusterTimestamp(containerId.getApplicationAttemptId().getApplicationId().getClusterTimestamp())
+          .setId(containerId.getApplicationAttemptId().getApplicationId().getId())
+          .build();
+
+      YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder()
+          .setAttemptId(containerId.getApplicationAttemptId().getAttemptId())
+          .setApplicationId(appIdProto)
+          .build();
+
+      return YarnProtos.ContainerIdProto.newBuilder()
+          .setAppAttemptId(attemptIdProto)
+          .setAppId(appIdProto)
+          .setId(containerId.getId())
+          .build();
+    }
+  }
+
+  @Override
+  protected void build() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/026368be/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
new file mode 100644
index 0000000..2c3572c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -0,0 +1,709 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.master.querymaster.QueryJobEvent;
+import org.apache.tajo.util.ApplicationIdUtils;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TajoWorkerResourceManager implements WorkerResourceManager {
+  private static final Log LOG = LogFactory.getLog(TajoWorkerResourceManager.class);
+
+  static AtomicInteger containerIdSeq = new AtomicInteger(0);
+
+  private TajoMaster.MasterContext masterContext;
+
+  //all workers(include querymaster)
+  private Map<String, WorkerResource> allWorkerResourceMap = new HashMap<String, WorkerResource>();
+
+  //all workers(include querymaster)
+  private Set<String> deadWorkerResources = new HashSet<String>();
+
+  //worker only
+  private Set<String> liveWorkerResources = new HashSet<String>();
+
+  //querymaster only
+  private Set<String> liveQueryMasterWorkerResources = new HashSet<String>();
+
+  private Map<QueryId, WorkerResource> queryMasterMap = new HashMap<QueryId, WorkerResource>();
+
+  private final Object workerResourceLock = new Object();
+
+  private String queryIdSeed;
+
+  private WorkerResourceAllocationThread workerResourceAllocator;
+
+  private WorkerMonitorThread workerMonitor;
+
+  private BlockingQueue<WorkerResourceRequest> requestQueue;
+
+  private List<WorkerResourceRequest> reAllocationList;
+
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private float queryMasterDefaultDiskSlot;
+
+  private int queryMasterDefaultMemoryMB;
+
+  private TajoConf tajoConf;
+
+  private Map<YarnProtos.ContainerIdProto, AllocatedWorkerResource> allocatedResourceMap =
+      new HashMap<YarnProtos.ContainerIdProto, AllocatedWorkerResource>();
+
+  public TajoWorkerResourceManager(TajoMaster.MasterContext masterContext) {
+    this.masterContext = masterContext;
+    init(masterContext.getConf());
+  }
+
+  public TajoWorkerResourceManager(TajoConf tajoConf) {
+    init(tajoConf);
+  }
+
+  private void init(TajoConf tajoConf) {
+    this.tajoConf = tajoConf;
+    this.queryIdSeed = String.valueOf(System.currentTimeMillis());
+
+    this.queryMasterDefaultDiskSlot =
+        tajoConf.getFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
+
+    this.queryMasterDefaultMemoryMB =
+        tajoConf.getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB);
+
+    requestQueue = new LinkedBlockingDeque<WorkerResourceRequest>();
+    reAllocationList = new ArrayList<WorkerResourceRequest>();
+
+    workerResourceAllocator = new WorkerResourceAllocationThread();
+    workerResourceAllocator.start();
+
+    workerMonitor = new WorkerMonitorThread();
+    workerMonitor.start();
+  }
+
+  public Map<String, WorkerResource> getWorkers() {
+    return Collections.unmodifiableMap(allWorkerResourceMap);
+  }
+
+  public Collection<String> getQueryMasters() {
+    return Collections.unmodifiableSet(liveQueryMasterWorkerResources);
+  }
+
+  @Override
+  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+    int totalDiskSlots = 0;
+    int totalCpuCoreSlots = 0;
+    int totalMemoryMB = 0;
+
+    int totalAvailableDiskSlots = 0;
+    int totalAvailableCpuCoreSlots = 0;
+    int totalAvailableMemoryMB = 0;
+
+    synchronized(workerResourceLock) {
+      for(String eachWorker: liveWorkerResources) {
+        WorkerResource worker = allWorkerResourceMap.get(eachWorker);
+        if(worker != null) {
+          totalMemoryMB += worker.getMemoryMB();
+          totalAvailableMemoryMB += worker.getAvailableMemoryMB();
+
+          totalDiskSlots += worker.getDiskSlots();
+          totalAvailableDiskSlots += worker.getAvailableDiskSlots();
+
+          totalCpuCoreSlots += worker.getCpuCoreSlots();
+          totalAvailableCpuCoreSlots += worker.getAvailableCpuCoreSlots();
+        }
+      }
+    }
+
+    return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
+            .setNumWorkers(liveWorkerResources.size())
+            .setTotalCpuCoreSlots(totalCpuCoreSlots)
+            .setTotalDiskSlots(totalDiskSlots)
+            .setTotalMemoryMB(totalMemoryMB)
+            .setTotalAvailableCpuCoreSlots(totalAvailableCpuCoreSlots)
+            .setTotalAvailableDiskSlots(totalAvailableDiskSlots)
+            .setTotalAvailableMemoryMB(totalAvailableMemoryMB)
+            .build();
+  }
+
+  @Override
+  public void stop() {
+    if(stopped.get()) {
+      return;
+    }
+    stopped.set(true);
+    if(workerResourceAllocator != null) {
+      workerResourceAllocator.interrupt();
+    }
+    if(workerMonitor != null) {
+      workerMonitor.interrupt();
+    }
+  }
+
+  @Override
+  public WorkerResource allocateQueryMaster(QueryInProgress queryInProgress) {
+    return allocateQueryMaster(queryInProgress.getQueryId());
+  }
+
+  public WorkerResource allocateQueryMaster(QueryId queryId) {
+    synchronized(workerResourceLock) {
+      if(liveQueryMasterWorkerResources.size() == 0) {
+        LOG.warn("No available resource for querymaster:" + queryId);
+        return null;
+      }
+      WorkerResource queryMasterWorker = null;
+      int minTasks = Integer.MAX_VALUE;
+      for(String eachQueryMaster: liveQueryMasterWorkerResources) {
+        WorkerResource queryMaster = allWorkerResourceMap.get(eachQueryMaster);
+        if(queryMaster != null && queryMaster.getNumQueryMasterTasks() < minTasks) {
+          queryMasterWorker = queryMaster;
+          minTasks = queryMaster.getNumQueryMasterTasks();
+        }
+      }
+      if(queryMasterWorker == null) {
+        return null;
+      }
+      queryMasterWorker.addNumQueryMasterTask(queryMasterDefaultDiskSlot, queryMasterDefaultMemoryMB);
+      queryMasterMap.put(queryId, queryMasterWorker);
+      LOG.info(queryId + "'s QueryMaster is " + queryMasterWorker);
+      return queryMasterWorker;
+    }
+  }
+
+  @Override
+  public void startQueryMaster(QueryInProgress queryInProgress) {
+    WorkerResource queryMasterWorkerResource = null;
+    synchronized(workerResourceLock) {
+      queryMasterWorkerResource = queryMasterMap.get(queryInProgress.getQueryId());
+    }
+
+    if(queryMasterWorkerResource != null) {
+      AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+      allocatedWorkerResource.workerResource = queryMasterWorkerResource;
+      allocatedWorkerResource.allocatedMemoryMB = queryMasterDefaultMemoryMB;
+      allocatedWorkerResource.allocatedDiskSlots = queryMasterDefaultDiskSlot;
+
+      startQueryMaster(queryInProgress.getQueryId(), allocatedWorkerResource);
+    } else {
+      //add queue
+      TajoMasterProtocol.WorkerResourceAllocationRequest request =
+          TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
+            .setExecutionBlockId(QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0).getProto())
+            .setNumContainers(1)
+            .setMinMemoryMBPerContainer(queryMasterDefaultMemoryMB)
+            .setMaxMemoryMBPerContainer(queryMasterDefaultMemoryMB)
+            .setMinDiskSlotPerContainer(queryMasterDefaultDiskSlot)
+            .setMaxDiskSlotPerContainer(queryMasterDefaultDiskSlot)
+            .setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY)
+            .build();
+      try {
+        requestQueue.put(new WorkerResourceRequest(queryInProgress.getQueryId(), true, request, null));
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  private void startQueryMaster(QueryId queryId, AllocatedWorkerResource workResource) {
+    QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
+    if(queryInProgress == null) {
+      LOG.warn("No QueryInProgress while starting  QueryMaster:" + queryId);
+      return;
+    }
+    queryInProgress.getQueryInfo().setQueryMasterResource(workResource.workerResource);
+
+    //fire QueryJobStart event
+    queryInProgress.getEventHandler().handle(
+        new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo()));
+  }
+
+  @Override
+  public String getSeedQueryId() throws IOException {
+    return queryIdSeed;
+  }
+
+  @Override
+  public void allocateWorkerResources(
+      TajoMasterProtocol.WorkerResourceAllocationRequest request,
+      RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack) {
+    try {
+      //TODO checking queue size
+      requestQueue.put(new WorkerResourceRequest(
+          new QueryId(request.getExecutionBlockId().getQueryId()), false, request, callBack));
+    } catch (InterruptedException e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  class WorkerMonitorThread extends Thread {
+    int heartbeatTimeout;
+
+    @Override
+    public void run() {
+      heartbeatTimeout = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_TIMEOUT);
+      LOG.info("WorkerMonitor start");
+      while(!stopped.get()) {
+        try {
+          Thread.sleep(10 * 1000);
+        } catch (InterruptedException e) {
+          if(stopped.get()) {
+            break;
+          }
+        }
+        synchronized(workerResourceLock) {
+          Set<String> workerHolders = new HashSet<String>();
+          workerHolders.addAll(liveWorkerResources);
+          for(String eachLiveWorker: workerHolders) {
+            WorkerResource worker = allWorkerResourceMap.get(eachLiveWorker);
+            if(worker == null) {
+              LOG.warn(eachLiveWorker + " not in WorkerReosurceMap");
+              continue;
+            }
+
+            if(System.currentTimeMillis() - worker.getLastHeartbeat() >= heartbeatTimeout) {
+              liveWorkerResources.remove(eachLiveWorker);
+              deadWorkerResources.add(eachLiveWorker);
+              worker.setWorkerStatus(WorkerStatus.DEAD);
+              LOG.warn("Worker [" + eachLiveWorker + "] is dead.");
+            }
+          }
+
+          //QueryMaster
+          workerHolders.clear();
+
+          workerHolders.addAll(liveQueryMasterWorkerResources);
+          for(String eachLiveWorker: workerHolders) {
+            WorkerResource worker = allWorkerResourceMap.get(eachLiveWorker);
+            if(worker == null) {
+              LOG.warn(eachLiveWorker + " not in WorkerResourceMap");
+              continue;
+            }
+
+            if(System.currentTimeMillis() - worker.getLastHeartbeat() >= heartbeatTimeout) {
+              liveQueryMasterWorkerResources.remove(eachLiveWorker);
+              deadWorkerResources.add(eachLiveWorker);
+              worker.setWorkerStatus(WorkerStatus.DEAD);
+              LOG.warn("QueryMaster [" + eachLiveWorker + "] is dead.");
+            }
+          }
+        }
+      }
+    }
+  }
+
+  static class WorkerResourceRequest {
+    boolean queryMasterRequest;
+    QueryId queryId;
+    TajoMasterProtocol.WorkerResourceAllocationRequest request;
+    RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack;
+    WorkerResourceRequest(
+        QueryId queryId,
+        boolean queryMasterRequest, TajoMasterProtocol.WorkerResourceAllocationRequest request,
+        RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack) {
+      this.queryId = queryId;
+      this.queryMasterRequest = queryMasterRequest;
+      this.request = request;
+      this.callBack = callBack;
+    }
+  }
+
+  static class AllocatedWorkerResource {
+    WorkerResource workerResource;
+    int allocatedMemoryMB;
+    float allocatedDiskSlots;
+  }
+
+  class WorkerResourceAllocationThread extends Thread {
+    @Override
+    public void run() {
+      LOG.info("WorkerResourceAllocationThread start");
+      while(!stopped.get()) {
+        try {
+          WorkerResourceRequest resourceRequest = requestQueue.take();
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("allocateWorkerResources:" +
+                (new ExecutionBlockId(resourceRequest.request.getExecutionBlockId())) +
+                ", requiredMemory:" + resourceRequest.request.getMinMemoryMBPerContainer() +
+                "~" + resourceRequest.request.getMaxMemoryMBPerContainer() +
+                ", requiredContainers:" + resourceRequest.request.getNumContainers() +
+                ", requiredDiskSlots:" + resourceRequest.request.getMinDiskSlotPerContainer() +
+                "~" + resourceRequest.request.getMaxDiskSlotPerContainer() +
+                ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
+                ", liveWorkers=" + liveWorkerResources.size());
+          }
+
+          List<AllocatedWorkerResource> allocatedWorkerResources = chooseWorkers(resourceRequest);
+
+          if(allocatedWorkerResources.size() > 0) {
+            if(resourceRequest.queryMasterRequest) {
+              startQueryMaster(resourceRequest.queryId, allocatedWorkerResources.get(0));
+            } else {
+              List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources =
+                  new ArrayList<TajoMasterProtocol.WorkerAllocatedResource>();
+
+              for(AllocatedWorkerResource eachWorker: allocatedWorkerResources) {
+                NodeId nodeId = NodeId.newInstance(eachWorker.workerResource.getAllocatedHost(),
+                    eachWorker.workerResource.getPeerRpcPort());
+
+                TajoWorkerContainerId containerId = new TajoWorkerContainerId();
+
+                containerId.setApplicationAttemptId(
+                    ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
+                containerId.setId(containerIdSeq.incrementAndGet());
+
+                YarnProtos.ContainerIdProto containerIdProto = containerId.getProto();
+                allocatedResources.add(TajoMasterProtocol.WorkerAllocatedResource.newBuilder()
+                    .setContainerId(containerIdProto)
+                    .setNodeId(nodeId.toString())
+                    .setWorkerHost(eachWorker.workerResource.getAllocatedHost())
+                    .setQueryMasterPort(eachWorker.workerResource.getQueryMasterPort())
+                    .setPeerRpcPort(eachWorker.workerResource.getPeerRpcPort())
+                    .setWorkerPullServerPort(eachWorker.workerResource.getPullServerPort())
+                    .setAllocatedMemoryMB(eachWorker.allocatedMemoryMB)
+                    .setAllocatedDiskSlots(eachWorker.allocatedDiskSlots)
+                    .build());
+
+                synchronized(workerResourceLock) {
+                  allocatedResourceMap.put(containerIdProto, eachWorker);
+                }
+              }
+
+              resourceRequest.callBack.run(TajoMasterProtocol.WorkerResourceAllocationResponse.newBuilder()
+                  .setExecutionBlockId(resourceRequest.request.getExecutionBlockId())
+                  .addAllWorkerAllocatedResource(allocatedResources)
+                  .build()
+              );
+            }
+          } else {
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("=========================================");
+              LOG.debug("Available Workers");
+              for(String liveWorker: liveWorkerResources) {
+                LOG.debug(allWorkerResourceMap.get(liveWorker).toString());
+              }
+              LOG.debug("=========================================");
+            }
+            requestQueue.put(resourceRequest);
+            Thread.sleep(100);
+          }
+        } catch(InterruptedException ie) {
+          LOG.error(ie);
+        }
+      }
+    }
+  }
+
+  private List<AllocatedWorkerResource> chooseWorkers(WorkerResourceRequest resourceRequest) {
+    List<AllocatedWorkerResource> selectedWorkers = new ArrayList<AllocatedWorkerResource>();
+
+    int allocatedResources = 0;
+
+    if(resourceRequest.queryMasterRequest) {
+      WorkerResource worker = allocateQueryMaster(resourceRequest.queryId);
+      if(worker != null) {
+        AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+        allocatedWorkerResource.workerResource = worker;
+        allocatedWorkerResource.allocatedDiskSlots = queryMasterDefaultDiskSlot;
+        allocatedWorkerResource.allocatedMemoryMB = queryMasterDefaultMemoryMB;
+        selectedWorkers.add(allocatedWorkerResource);
+
+        return selectedWorkers;
+      }
+    }
+
+    TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority
+        = resourceRequest.request.getResourceRequestPriority();
+
+    if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
+      synchronized(workerResourceLock) {
+        List<String> randomWorkers = new ArrayList<String>(liveWorkerResources);
+        Collections.shuffle(randomWorkers);
+
+        int numContainers = resourceRequest.request.getNumContainers();
+        int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer();
+        int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer();
+        float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(),
+            resourceRequest.request.getMinDiskSlotPerContainer());
+
+        int liveWorkerSize = randomWorkers.size();
+        Set<String> insufficientWorkers = new HashSet<String>();
+        boolean stop = false;
+        boolean checkMax = true;
+        while(!stop) {
+          if(allocatedResources >= numContainers) {
+            break;
+          }
+
+          if(insufficientWorkers.size() >= liveWorkerSize) {
+            if(!checkMax) {
+              break;
+            }
+            insufficientWorkers.clear();
+            checkMax = false;
+          }
+          int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB;
+
+          for(String eachWorker: randomWorkers) {
+            if(allocatedResources >= numContainers) {
+              stop = true;
+              break;
+            }
+
+            if(insufficientWorkers.size() >= liveWorkerSize) {
+              break;
+            }
+
+            WorkerResource workerResource = allWorkerResourceMap.get(eachWorker);
+            if(workerResource.getAvailableMemoryMB() >= compareAvailableMemory) {
+              int workerMemory;
+              if(workerResource.getAvailableMemoryMB() >= maxMemoryMB) {
+                workerMemory = maxMemoryMB;
+              } else {
+                workerMemory = workerResource.getAvailableMemoryMB();
+              }
+              AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+              allocatedWorkerResource.workerResource = workerResource;
+              allocatedWorkerResource.allocatedMemoryMB = workerMemory;
+              if(workerResource.getAvailableDiskSlots() >= diskSlot) {
+                allocatedWorkerResource.allocatedDiskSlots = diskSlot;
+              } else {
+                allocatedWorkerResource.allocatedDiskSlots = workerResource.getAvailableDiskSlots();
+              }
+
+              workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
+                  allocatedWorkerResource.allocatedMemoryMB);
+
+              selectedWorkers.add(allocatedWorkerResource);
+
+              allocatedResources++;
+            } else {
+              insufficientWorkers.add(eachWorker);
+            }
+          }
+        }
+      }
+    } else {
+      synchronized(workerResourceLock) {
+        List<String> randomWorkers = new ArrayList<String>(liveWorkerResources);
+        Collections.shuffle(randomWorkers);
+
+        int numContainers = resourceRequest.request.getNumContainers();
+        float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer();
+        float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer();
+        int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(),
+            resourceRequest.request.getMinMemoryMBPerContainer());
+
+        int liveWorkerSize = randomWorkers.size();
+        Set<String> insufficientWorkers = new HashSet<String>();
+        boolean stop = false;
+        boolean checkMax = true;
+        while(!stop) {
+          if(allocatedResources >= numContainers) {
+            break;
+          }
+
+          if(insufficientWorkers.size() >= liveWorkerSize) {
+            if(!checkMax) {
+              break;
+            }
+            insufficientWorkers.clear();
+            checkMax = false;
+          }
+          float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots;
+
+          for(String eachWorker: randomWorkers) {
+            if(allocatedResources >= numContainers) {
+              stop = true;
+              break;
+            }
+
+            if(insufficientWorkers.size() >= liveWorkerSize) {
+              break;
+            }
+
+            WorkerResource workerResource = allWorkerResourceMap.get(eachWorker);
+            if(workerResource.getAvailableDiskSlots() >= compareAvailableDisk) {
+              float workerDiskSlots;
+              if(workerResource.getAvailableDiskSlots() >= maxDiskSlots) {
+                workerDiskSlots = maxDiskSlots;
+              } else {
+                workerDiskSlots = workerResource.getAvailableDiskSlots();
+              }
+              AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+              allocatedWorkerResource.workerResource = workerResource;
+              allocatedWorkerResource.allocatedDiskSlots = workerDiskSlots;
+
+              if(workerResource.getAvailableMemoryMB() >= memoryMB) {
+                allocatedWorkerResource.allocatedMemoryMB = memoryMB;
+              } else {
+                allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB();
+              }
+              workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
+                  allocatedWorkerResource.allocatedMemoryMB);
+
+              selectedWorkers.add(allocatedWorkerResource);
+
+              allocatedResources++;
+            } else {
+              insufficientWorkers.add(eachWorker);
+            }
+          }
+        }
+      }
+    }
+    return selectedWorkers;
+  }
+
+  @Override
+  public void releaseWorkerResource(ExecutionBlockId ebId, YarnProtos.ContainerIdProto containerId) {
+    synchronized(workerResourceLock) {
+      AllocatedWorkerResource allocatedWorkerResource = allocatedResourceMap.get(containerId);
+      if(allocatedWorkerResource != null) {
+        LOG.info("Release Resource:" + ebId + "," +
+            allocatedWorkerResource.allocatedDiskSlots + "," + allocatedWorkerResource.allocatedMemoryMB);
+        allocatedWorkerResource.workerResource.releaseResource(
+            allocatedWorkerResource.allocatedDiskSlots, allocatedWorkerResource.allocatedMemoryMB);
+      } else {
+        LOG.warn("No AllocatedWorkerResource data for [" + ebId + "," + containerId + "]");
+        return;
+      }
+    }
+
+    synchronized(reAllocationList) {
+      reAllocationList.notifyAll();
+    }
+  }
+
+  @Override
+  public boolean isQueryMasterStopped(QueryId queryId) {
+    synchronized(workerResourceLock) {
+      return !queryMasterMap.containsKey(queryId);
+    }
+  }
+
+  @Override
+  public void init(Configuration conf) {
+  }
+
+  @Override
+  public void stopQueryMaster(QueryId queryId) {
+    WorkerResource queryMasterWorkerResource = null;
+    synchronized(workerResourceLock) {
+      if(!queryMasterMap.containsKey(queryId)) {
+        LOG.warn("No QueryMaster resource info for " + queryId);
+        return;
+      } else {
+        queryMasterWorkerResource = queryMasterMap.remove(queryId);
+        queryMasterWorkerResource.releaseQueryMasterTask(queryMasterDefaultDiskSlot, queryMasterDefaultMemoryMB);
+      }
+    }
+
+    LOG.info("release QueryMaster resource:" + queryId + "," + queryMasterWorkerResource);
+  }
+
+  public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request) {
+    synchronized(workerResourceLock) {
+      String workerKey = request.getTajoWorkerHost() + ":" + request.getTajoQueryMasterPort() + ":"
+          + request.getPeerRpcPort();
+      boolean queryMasterMode = request.getServerStatus().getQueryMasterMode().getValue();
+      boolean taskRunnerMode = request.getServerStatus().getTaskRunnerMode().getValue();
+
+      if(allWorkerResourceMap.containsKey(workerKey)) {
+        WorkerResource workerResource = allWorkerResourceMap.get(workerKey);
+
+        if(deadWorkerResources.contains(workerKey)) {
+          deadWorkerResources.remove(workerKey);
+          if(queryMasterMode) {
+            liveQueryMasterWorkerResources.add(workerKey);
+            workerResource.setNumRunningTasks(0);
+            LOG.info("Heartbeat received from QueryMaster [" + workerKey + "] again.");
+          }
+          if(taskRunnerMode) {
+            liveWorkerResources.add(workerKey);
+            LOG.info("Heartbeat received from Worker [" + workerKey + "] again.");
+          }
+        }
+        workerResource.setLastHeartbeat(System.currentTimeMillis());
+        workerResource.setWorkerStatus(WorkerStatus.LIVE);
+        workerResource.setNumRunningTasks(request.getServerStatus().getRunningTaskNum());
+        workerResource.setMaxHeap(request.getServerStatus().getJvmHeap().getMaxHeap());
+        workerResource.setFreeHeap(request.getServerStatus().getJvmHeap().getFreeHeap());
+        workerResource.setTotalHeap(request.getServerStatus().getJvmHeap().getTotalHeap());
+      } else {
+        //initial connection
+        WorkerResource workerResource = new WorkerResource();
+        workerResource.setAllocatedHost(request.getTajoWorkerHost());
+        workerResource.setQueryMasterMode(queryMasterMode);
+        workerResource.setTaskRunnerMode(taskRunnerMode);
+
+        workerResource.setQueryMasterPort(request.getTajoQueryMasterPort());
+        workerResource.setPeerRpcPort(request.getPeerRpcPort());
+        workerResource.setClientPort(request.getTajoWorkerClientPort());
+        workerResource.setPullServerPort(request.getTajoWorkerPullServerPort());
+        workerResource.setHttpPort(request.getTajoWorkerHttpPort());
+
+        workerResource.setLastHeartbeat(System.currentTimeMillis());
+        workerResource.setWorkerStatus(WorkerStatus.LIVE);
+        if(request.getServerStatus() != null) {
+          workerResource.setMemoryMB(request.getServerStatus().getMemoryResourceMB());
+          workerResource.setCpuCoreSlots(request.getServerStatus().getSystem().getAvailableProcessors());
+          workerResource.setDiskSlots(request.getServerStatus().getDiskSlots());
+          workerResource.setNumRunningTasks(request.getServerStatus().getRunningTaskNum());
+          workerResource.setMaxHeap(request.getServerStatus().getJvmHeap().getMaxHeap());
+          workerResource.setFreeHeap(request.getServerStatus().getJvmHeap().getFreeHeap());
+          workerResource.setTotalHeap(request.getServerStatus().getJvmHeap().getTotalHeap());
+        } else {
+          workerResource.setMemoryMB(4096);
+          workerResource.setDiskSlots(4);
+          workerResource.setCpuCoreSlots(4);
+        }
+
+        allWorkerResourceMap.put(workerResource.getId(), workerResource);
+        if(queryMasterMode) {
+          liveQueryMasterWorkerResources.add(workerKey);
+        }
+
+        if(taskRunnerMode) {
+          liveWorkerResources.add(workerKey);
+        }
+
+        LOG.info("TajoWorker:" + workerResource + " added in live TajoWorker list");
+
+        workerResourceLock.notifyAll();
+      }
+    }
+  }
+}


Mime
View raw message