tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [25/51] [partial] TAJO-22: The package prefix should be org.apache.tajo. (DaeMyung Kang via hyunsik)
Date Tue, 02 Jul 2013 14:16:19 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
new file mode 100644
index 0000000..d447d07
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -0,0 +1,721 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.SubQueryId;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.parser.QueryBlock.FromTable;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.master.ExecutionBlock.PartitionType;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.util.TajoIdUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GlobalPlanner {
+  private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
+
+  private TajoConf conf;
+  private StorageManager sm;
+  private CatalogService catalog;
+  private QueryId queryId;
+  private EventHandler eventHandler;
+
+  public GlobalPlanner(final TajoConf conf, final CatalogService catalog,
+                       final StorageManager sm,
+                       final EventHandler eventHandler)
+      throws IOException {
+    this.conf = conf;
+    this.sm = sm;
+    this.catalog = catalog;
+    this.eventHandler = eventHandler;
+  }
+
+  /**
+   * Builds a master plan from the given logical plan.
+   * @param queryId
+   * @param rootNode
+   * @return
+   * @throws IOException
+   */
+  public MasterPlan build(QueryId queryId, LogicalRootNode rootNode)
+      throws IOException {
+    this.queryId = queryId;
+
+    String outputTableName = null;
+    if (rootNode.getSubNode().getType() == ExprType.STORE) {
+      // create table queries are executed by the master
+      StoreTableNode storeTableNode = (StoreTableNode) rootNode.getSubNode();
+      outputTableName = storeTableNode.getTableName();
+    }
+
+    // insert store at the subnode of the root
+    UnaryNode root = rootNode;
+    IndexWriteNode indexNode = null;
+    // TODO: check whether the type of the subnode is CREATE_INDEX
+    if (root.getSubNode().getType() == ExprType.CREATE_INDEX) {
+      indexNode = (IndexWriteNode) root.getSubNode();
+      root = (UnaryNode)root.getSubNode();
+      
+      StoreIndexNode store = new StoreIndexNode(
+          QueryIdFactory.newSubQueryId(this.queryId).toString());
+      store.setLocal(false);
+      PlannerUtil.insertNode(root, store);
+      
+    } else if (root.getSubNode().getType() != ExprType.STORE) {
+      SubQueryId subQueryId = QueryIdFactory.newSubQueryId(this.queryId);
+      outputTableName = subQueryId.toString();
+      insertStore(subQueryId.toString(),root).setLocal(false);
+    }
+    
+    // convert 2-phase plan
+    LogicalNode tp = convertTo2Phase(rootNode);
+
+    // make query graph
+    MasterPlan globalPlan = convertToGlobalPlan(indexNode, tp);
+    globalPlan.setOutputTableName(outputTableName);
+
+    return globalPlan;
+  }
+  
+  private StoreTableNode insertStore(String tableId, LogicalNode parent) {
+    StoreTableNode store = new StoreTableNode(tableId);
+    store.setLocal(true);
+    PlannerUtil.insertNode(parent, store);
+    return store;
+  }
+  
+  /**
+   * Transforms a logical plan to a two-phase plan. 
+   * Store nodes are inserted for every logical nodes except store and scan nodes
+   */
+  private class GlobalPlanBuilder implements LogicalNodeVisitor {
+    @Override
+    public void visit(LogicalNode node) {
+      String tableId;
+      StoreTableNode store;
+      if (node.getType() == ExprType.GROUP_BY) {
+        // transform group by to two-phase plan 
+        GroupbyNode groupby = (GroupbyNode) node;
+        // insert a store for the child of first group by
+        if (groupby.getSubNode().getType() != ExprType.UNION &&
+            groupby.getSubNode().getType() != ExprType.STORE &&
+            groupby.getSubNode().getType() != ExprType.SCAN) {
+          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+          insertStore(tableId, groupby);
+        }
+        tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+        // insert (a store for the first group by) and (a second group by)
+        PlannerUtil.transformGroupbyTo2PWithStore((GroupbyNode)node, tableId);
+      } else if (node.getType() == ExprType.SORT) {
+        // transform sort to two-phase plan 
+        SortNode sort = (SortNode) node;
+        // insert a store for the child of first sort
+        if (sort.getSubNode().getType() != ExprType.UNION &&
+            sort.getSubNode().getType() != ExprType.STORE &&
+            sort.getSubNode().getType() != ExprType.SCAN) {
+          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+          insertStore(tableId, sort);
+        }
+        tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+        // insert (a store for the first sort) and (a second sort)
+        PlannerUtil.transformSortTo2PWithStore((SortNode)node, tableId);
+      } else if (node.getType() == ExprType.JOIN) {
+        // transform join to two-phase plan 
+        // the first phase of two-phase join can be any logical nodes
+        JoinNode join = (JoinNode) node;
+
+        /*
+        if (join.getOuterNode().getType() == ExprType.SCAN &&
+            join.getInnerNode().getType() == ExprType.SCAN) {
+          ScanNode outerScan = (ScanNode) join.getOuterNode();
+          ScanNode innerScan = (ScanNode) join.getInnerNode();
+
+
+          TableMeta outerMeta =
+              catalog.getTableDesc(outerScan.getTableId()).getMeta();
+          TableMeta innerMeta =
+              catalog.getTableDesc(innerScan.getTableId()).getMeta();
+          long threshold = conf.getLongVar(ConfVars.BROADCAST_JOIN_THRESHOLD);
+
+
+          // if the broadcast join is available
+          boolean outerSmall = false;
+          boolean innerSmall = false;
+          if (!outerScan.isLocal() && outerMeta.getStat() != null &&
+              outerMeta.getStat().getNumBytes() <= threshold) {
+            outerSmall = true;
+            LOG.info("The relation (" + outerScan.getTableId() +
+                ") is less than " + threshold);
+          }
+          if (!innerScan.isLocal() && innerMeta.getStat() != null &&
+              innerMeta.getStat().getNumBytes() <= threshold) {
+            innerSmall = true;
+            LOG.info("The relation (" + innerScan.getTableId() +
+                ") is less than " + threshold);
+          }
+
+          if (outerSmall && innerSmall) {
+            if (outerMeta.getStat().getNumBytes() <=
+                innerMeta.getStat().getNumBytes()) {
+              outerScan.setBroadcast();
+              LOG.info("The relation " + outerScan.getTableId()
+                  + " is broadcasted");
+            } else {
+              innerScan.setBroadcast();
+              LOG.info("The relation " + innerScan.getTableId()
+                  + " is broadcasted");
+            }
+          } else {
+            if (outerSmall) {
+              outerScan.setBroadcast();
+              LOG.info("The relation (" + outerScan.getTableId()
+                  + ") is broadcasted");
+            } else if (innerSmall) {
+              innerScan.setBroadcast();
+              LOG.info("The relation (" + innerScan.getTableId()
+                  + ") is broadcasted");
+            }
+          }
+
+          if (outerScan.isBroadcast() || innerScan.isBroadcast()) {
+            return;
+          }
+        } */
+
+        // insert stores for the first phase
+        if (join.getOuterNode().getType() != ExprType.UNION &&
+            join.getOuterNode().getType() != ExprType.STORE) {
+          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+          store = new StoreTableNode(tableId);
+          store.setLocal(true);
+          PlannerUtil.insertOuterNode(node, store);
+        }
+        if (join.getInnerNode().getType() != ExprType.UNION &&
+            join.getInnerNode().getType() != ExprType.STORE) {
+          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+          store = new StoreTableNode(tableId);
+          store.setLocal(true);
+          PlannerUtil.insertInnerNode(node, store);
+        }
+      } else if (node.getType() == ExprType.UNION) {
+        // not two-phase transform
+        UnionNode union = (UnionNode) node;
+        // insert stores
+        if (union.getOuterNode().getType() != ExprType.UNION &&
+            union.getOuterNode().getType() != ExprType.STORE) {
+          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+          store = new StoreTableNode(tableId);
+          if(union.getOuterNode().getType() == ExprType.GROUP_BY) {
+            /*This case is for cube by operator
+             * TODO : more complicated conidtion*/
+            store.setLocal(true);
+          } else {
+            /* This case is for union query*/
+            store.setLocal(false);
+          }
+          PlannerUtil.insertOuterNode(node, store);
+        }
+        if (union.getInnerNode().getType() != ExprType.UNION &&
+            union.getInnerNode().getType() != ExprType.STORE) {
+          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+          store = new StoreTableNode(tableId);
+          if(union.getInnerNode().getType() == ExprType.GROUP_BY) {
+            /*This case is for cube by operator
+             * TODO : more complicated conidtion*/
+            store.setLocal(true);
+          }else {
+            /* This case is for union query*/
+            store.setLocal(false);
+          }
+          PlannerUtil.insertInnerNode(node, store);
+        }
+      } else if (node instanceof UnaryNode) {
+        UnaryNode unary = (UnaryNode)node;
+        if (unary.getType() != ExprType.STORE &&
+            unary.getSubNode().getType() != ExprType.STORE) {
+          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+          insertStore(tableId, unary);
+        }
+      }
+    }
+  }
+
+  /**
+   * Convert the logical plan to a two-phase plan by the post-order traverse.
+   * 
+   * @param logicalPlan
+   * @return
+   */
+  private LogicalNode convertTo2Phase(LogicalNode logicalPlan) {
+    LogicalRootNode root = (LogicalRootNode) logicalPlan;
+    root.postOrder(new GlobalPlanBuilder());
+    return logicalPlan;
+  }
+  
+  private Map<StoreTableNode, ExecutionBlock> convertMap =
+      new HashMap<StoreTableNode, ExecutionBlock>();
+  
+  /**
+   * Logical plan을 후위 탐색하면서 SubQuery 생성
+   * 
+   * @param node 현재 방문 중인 노드
+   * @throws IOException
+   */
+  private void recursiveBuildSubQuery(LogicalNode node)
+      throws IOException {
+    ExecutionBlock subQuery;
+    StoreTableNode store;
+    if (node instanceof UnaryNode) {
+      recursiveBuildSubQuery(((UnaryNode) node).getSubNode());
+      
+      if (node.getType() == ExprType.STORE) {
+        store = (StoreTableNode) node;
+        SubQueryId id;
+        if (store.getTableName().startsWith(QueryId.PREFIX)) {
+          id = TajoIdUtils.newSubQueryId(store.getTableName());
+        } else {
+          id = QueryIdFactory.newSubQueryId(queryId);
+        }
+        subQuery = new ExecutionBlock(id);
+
+        switch (store.getSubNode().getType()) {
+        case BST_INDEX_SCAN:
+        case SCAN:  // store - scan
+          subQuery = makeScanSubQuery(subQuery);
+          subQuery.setPlan(node);
+          break;
+        case SELECTION:
+        case PROJECTION:
+        case LIMIT:
+          subQuery = makeUnarySubQuery(store, node, subQuery);
+          subQuery.setPlan(node);
+          break;
+        case GROUP_BY:
+          subQuery = makeGroupbySubQuery(store, node, subQuery);
+          subQuery.setPlan(node);
+          break;
+        case SORT:
+          subQuery = makeSortSubQuery(store, node, subQuery);
+          subQuery.setPlan(node);
+          break;
+        case JOIN:  // store - join
+          subQuery = makeJoinSubQuery(store, node, subQuery);
+          subQuery.setPlan(node);
+          break;
+        case UNION:
+          subQuery = makeUnionSubQuery(store, node, subQuery);
+          subQuery.setPlan(node);
+          break;
+        default:
+          subQuery = null;
+          break;
+        }
+
+        convertMap.put(store, subQuery);
+      }
+    } else if (node instanceof BinaryNode) {
+      recursiveBuildSubQuery(((BinaryNode) node).getOuterNode());
+      recursiveBuildSubQuery(((BinaryNode) node).getInnerNode());
+    } else if (node instanceof ScanNode) {
+
+    } else {
+
+    }
+  }
+  
+  private ExecutionBlock makeScanSubQuery(ExecutionBlock block) {
+    block.setPartitionType(PartitionType.LIST);
+    return block;
+  }
+  
+  /**
+   * Unifiable node(selection, projection)을 자식 플랜과 같은 SubQuery로 생성
+   * 
+   * @param rootStore 생성할 SubQuery의 store
+   * @param plan logical plan
+   * @param unit 생성할 SubQuery
+   * @return
+   * @throws IOException
+   */
+  private ExecutionBlock makeUnarySubQuery(StoreTableNode rootStore,
+                                     LogicalNode plan, ExecutionBlock unit) throws IOException {
+    ScanNode newScan;
+    ExecutionBlock prev;
+    UnaryNode unary = (UnaryNode) plan;
+    UnaryNode child = (UnaryNode) unary.getSubNode();
+    StoreTableNode prevStore = (StoreTableNode)child.getSubNode();
+
+    // add scan
+    newScan = GlobalPlannerUtils.newScanPlan(prevStore.getOutSchema(),
+        prevStore.getTableName(), sm.getTablePath(prevStore.getTableName()));
+    newScan.setLocal(true);
+    child.setSubNode(newScan);
+    prev = convertMap.get(prevStore);
+
+    if (prev != null) {
+      prev.setParentBlock(unit);
+      unit.addChildBlock(newScan, prev);
+      prev.setPartitionType(PartitionType.LIST);
+    }
+
+    unit.setPartitionType(PartitionType.LIST);
+
+    return unit;
+  }
+  
+  /**
+   * Two-phase SubQuery 생성.
+   * 
+   * @param rootStore 생성할 SubQuery의 store
+   * @param plan logical plan
+   * @param unit 생성할 SubQuery
+   * @return
+   * @throws IOException
+   */
+  private ExecutionBlock makeGroupbySubQuery(StoreTableNode rootStore,
+                                       LogicalNode plan, ExecutionBlock unit) throws IOException {
+    UnaryNode unary = (UnaryNode) plan;
+    UnaryNode unaryChild;
+    StoreTableNode prevStore;
+    ScanNode newScan;
+    ExecutionBlock prev;
+    unaryChild = (UnaryNode) unary.getSubNode();  // groupby
+    ExprType curType = unaryChild.getType();
+    if (unaryChild.getSubNode().getType() == ExprType.STORE) {
+      // store - groupby - store
+      unaryChild = (UnaryNode) unaryChild.getSubNode(); // store
+      prevStore = (StoreTableNode) unaryChild;
+      newScan = GlobalPlannerUtils.newScanPlan(prevStore.getOutSchema(),
+          prevStore.getTableName(),
+          sm.getTablePath(prevStore.getTableName()));
+      newScan.setLocal(true);
+      ((UnaryNode) unary.getSubNode()).setSubNode(newScan);
+      prev = convertMap.get(prevStore);
+      if (prev != null) {
+        prev.setParentBlock(unit);
+        unit.addChildBlock(newScan, prev);
+      }
+
+      if (unaryChild.getSubNode().getType() == curType) {
+        // the second phase
+        unit.setPartitionType(PartitionType.LIST);
+        if (prev != null) {
+          prev.setPartitionType(PartitionType.HASH);
+        }
+      } else {
+        // the first phase
+        unit.setPartitionType(PartitionType.HASH);
+        if (prev != null) {
+          prev.setPartitionType(PartitionType.LIST);
+        }
+      }
+    } else if (unaryChild.getSubNode().getType() == ExprType.SCAN) {
+      // the first phase
+      // store - groupby - scan
+      unit.setPartitionType(PartitionType.HASH);
+    } else if (unaryChild.getSubNode().getType() == ExprType.UNION) {
+      _handleUnionNode(rootStore, (UnionNode)unaryChild.getSubNode(), unit, 
+          null, PartitionType.LIST);
+    } else {
+      // error
+    }
+    return unit;
+  }
+  
+  /**
+   *
+   *
+   * @param rootStore 생성할 SubQuery의 store
+   * @param plan logical plan
+   * @param unit 생성할 SubQuery
+   * @return
+   * @throws IOException
+   */
+  private ExecutionBlock makeUnionSubQuery(StoreTableNode rootStore,
+                                     LogicalNode plan, ExecutionBlock unit) throws IOException {
+    UnaryNode unary = (UnaryNode) plan;
+    StoreTableNode outerStore, innerStore;
+    ExecutionBlock prev;
+    UnionNode union = (UnionNode) unary.getSubNode();
+    unit.setPartitionType(PartitionType.LIST);
+    
+    if (union.getOuterNode().getType() == ExprType.STORE) {
+      outerStore = (StoreTableNode) union.getOuterNode();
+      TableMeta outerMeta = CatalogUtil.newTableMeta(outerStore.getOutSchema(),
+          StoreType.CSV);
+      insertOuterScan(union, outerStore.getTableName(), outerMeta);
+      prev = convertMap.get(outerStore);
+      if (prev != null) {
+        prev.getStoreTableNode().setTableName(rootStore.getTableName());
+        prev.setPartitionType(PartitionType.LIST);
+        prev.setParentBlock(unit);
+        unit.addChildBlock((ScanNode) union.getOuterNode(), prev);
+      }
+    } else if (union.getOuterNode().getType() == ExprType.UNION) {
+      _handleUnionNode(rootStore, union, unit, null, PartitionType.LIST);
+    }
+    
+    if (union.getInnerNode().getType() == ExprType.STORE) {
+      innerStore = (StoreTableNode) union.getInnerNode();
+      TableMeta innerMeta = CatalogUtil.newTableMeta(innerStore.getOutSchema(),
+          StoreType.CSV);
+      insertInnerScan(union, innerStore.getTableName(), innerMeta);
+      prev = convertMap.get(innerStore);
+      if (prev != null) {
+        prev.getStoreTableNode().setTableName(rootStore.getTableName());
+        prev.setPartitionType(PartitionType.LIST);
+        prev.setParentBlock(unit);
+        unit.addChildBlock((ScanNode) union.getInnerNode(), prev);
+      }
+    } else if (union.getInnerNode().getType() == ExprType.UNION) {
+      _handleUnionNode(rootStore, union, unit, null, PartitionType.LIST);
+    }
+
+    return unit;
+  }
+
+  private ExecutionBlock makeSortSubQuery(StoreTableNode rootStore,
+                                    LogicalNode plan, ExecutionBlock unit) throws IOException {
+
+    UnaryNode unary = (UnaryNode) plan;
+    UnaryNode unaryChild;
+    StoreTableNode prevStore;
+    ScanNode newScan;
+    ExecutionBlock prev;
+    unaryChild = (UnaryNode) unary.getSubNode();  // groupby
+    ExprType curType = unaryChild.getType();
+    if (unaryChild.getSubNode().getType() == ExprType.STORE) {
+      // store - groupby - store
+      unaryChild = (UnaryNode) unaryChild.getSubNode(); // store
+      prevStore = (StoreTableNode) unaryChild;
+      newScan = GlobalPlannerUtils.newScanPlan(prevStore.getOutSchema(),
+          prevStore.getTableName(), sm.getTablePath(prevStore.getTableName()));
+      newScan.setLocal(true);
+      ((UnaryNode) unary.getSubNode()).setSubNode(newScan);
+      prev = convertMap.get(prevStore);
+      if (prev != null) {
+        prev.setParentBlock(unit);
+        unit.addChildBlock(newScan, prev);
+        if (unaryChild.getSubNode().getType() == curType) {
+          // TODO - this is duplicated code
+          prev.setPartitionType(PartitionType.RANGE);
+        } else {
+          prev.setPartitionType(PartitionType.LIST);
+        }
+      }
+      if (unaryChild.getSubNode().getType() == curType) {
+        // the second phase
+        unit.setPartitionType(PartitionType.LIST);
+      } else {
+        // the first phase
+        unit.setPartitionType(PartitionType.HASH);
+      }
+    } else if (unaryChild.getSubNode().getType() == ExprType.SCAN) {
+      // the first phase
+      // store - sort - scan
+      unit.setPartitionType(PartitionType.RANGE);
+    } else if (unaryChild.getSubNode().getType() == ExprType.UNION) {
+      _handleUnionNode(rootStore, (UnionNode)unaryChild.getSubNode(), unit,
+          null, PartitionType.LIST);
+    } else {
+      // error
+    }
+    return unit;
+  }
+  
+  private ExecutionBlock makeJoinSubQuery(StoreTableNode rootStore,
+                                    LogicalNode plan, ExecutionBlock unit) throws IOException {
+    UnaryNode unary = (UnaryNode)plan;
+    StoreTableNode outerStore, innerStore;
+    ExecutionBlock prev;
+    JoinNode join = (JoinNode) unary.getSubNode();
+    Schema outerSchema = join.getOuterNode().getOutSchema();
+    Schema innerSchema = join.getInnerNode().getOutSchema();
+    unit.setPartitionType(PartitionType.LIST);
+
+    List<Column> outerCollist = new ArrayList<Column>();
+    List<Column> innerCollist = new ArrayList<Column>();
+    
+    // TODO: set partition for store nodes
+    if (join.hasJoinQual()) {
+      // getting repartition keys
+      List<Column[]> cols = PlannerUtil.getJoinKeyPairs(join.getJoinQual(), outerSchema, innerSchema);
+      for (Column [] pair : cols) {
+        outerCollist.add(pair[0]);
+        innerCollist.add(pair[1]);
+      }
+    } else {
+      // broadcast
+    }
+    
+    Column[] outerCols = new Column[outerCollist.size()];
+    Column[] innerCols = new Column[innerCollist.size()];
+    outerCols = outerCollist.toArray(outerCols);
+    innerCols = innerCollist.toArray(innerCols);
+    
+    // outer
+    if (join.getOuterNode().getType() == ExprType.STORE) {
+      outerStore = (StoreTableNode) join.getOuterNode();
+      TableMeta outerMeta = CatalogUtil.newTableMeta(outerStore.getOutSchema(),
+          StoreType.CSV);
+      insertOuterScan(join, outerStore.getTableName(), outerMeta);
+      prev = convertMap.get(outerStore);
+      if (prev != null) {
+        prev.setPartitionType(PartitionType.HASH);
+        prev.setParentBlock(unit);
+        unit.addChildBlock((ScanNode) join.getOuterNode(), prev);
+      }
+      outerStore.setPartitions(PartitionType.HASH, outerCols, 32);
+    } else if (join.getOuterNode().getType() == ExprType.UNION) {
+      _handleUnionNode(rootStore, (UnionNode)join.getOuterNode(), unit, 
+          outerCols, PartitionType.HASH);
+    } else {
+
+    }
+    
+    // inner
+    if (join.getInnerNode().getType() == ExprType.STORE) {
+      innerStore = (StoreTableNode) join.getInnerNode();
+      TableMeta innerMeta = CatalogUtil.newTableMeta(innerStore.getOutSchema(),
+          StoreType.CSV);
+      insertInnerScan(join, innerStore.getTableName(), innerMeta);
+      prev = convertMap.get(innerStore);
+      if (prev != null) {
+        prev.setPartitionType(PartitionType.HASH);
+        prev.setParentBlock(unit);
+        unit.addChildBlock((ScanNode) join.getInnerNode(), prev);
+      }
+      innerStore.setPartitions(PartitionType.HASH, innerCols, 32);
+    } else if (join.getInnerNode().getType() == ExprType.UNION) {
+      _handleUnionNode(rootStore, (UnionNode)join.getInnerNode(), unit,
+          innerCols, PartitionType.HASH);
+    }
+    
+    return unit;
+  }
+  
+  /**
+   * Recursive하게 union의 자식 plan들을 설정
+   * 
+   * @param rootStore 생성할 SubQuery의 store
+   * @param union union을 root로 하는 logical plan
+   * @param cur 생성할 SubQuery
+   * @param cols partition 정보를 설정하기 위한 column array
+   * @param prevOutputType 자식 SubQuery의 partition type
+   * @throws IOException
+   */
+  private void _handleUnionNode(StoreTableNode rootStore, UnionNode union, 
+      ExecutionBlock cur, Column[] cols, PartitionType prevOutputType)
+          throws IOException {
+    StoreTableNode store;
+    TableMeta meta;
+    ExecutionBlock prev;
+    
+    if (union.getOuterNode().getType() == ExprType.STORE) {
+      store = (StoreTableNode) union.getOuterNode();
+      meta = CatalogUtil.newTableMeta(store.getOutSchema(), StoreType.CSV);
+      insertOuterScan(union, store.getTableName(), meta);
+      prev = convertMap.get(store);
+      if (prev != null) {
+        prev.getStoreTableNode().setTableName(rootStore.getTableName());
+        prev.setPartitionType(prevOutputType);
+        prev.setParentBlock(cur);
+        cur.addChildBlock((ScanNode) union.getOuterNode(), prev);
+      }
+      if (cols != null) {
+        store.setPartitions(PartitionType.LIST, cols, 32);
+      }
+    } else if (union.getOuterNode().getType() == ExprType.UNION) {
+      _handleUnionNode(rootStore, (UnionNode)union.getOuterNode(), cur, cols, 
+          prevOutputType);
+    }
+    
+    if (union.getInnerNode().getType() == ExprType.STORE) {
+      store = (StoreTableNode) union.getInnerNode();
+      meta = CatalogUtil.newTableMeta(store.getOutSchema(), StoreType.CSV);
+      insertInnerScan(union, store.getTableName(), meta);
+      prev = convertMap.get(store);
+      if (prev != null) {
+        prev.getStoreTableNode().setTableName(rootStore.getTableName());
+        prev.setPartitionType(prevOutputType);
+        prev.setParentBlock(cur);
+        cur.addChildBlock((ScanNode) union.getInnerNode(), prev);
+      }
+      if (cols != null) {
+        store.setPartitions(PartitionType.LIST, cols, 32);
+      }
+    } else if (union.getInnerNode().getType() == ExprType.UNION) {
+      _handleUnionNode(rootStore, (UnionNode)union.getInnerNode(), cur, cols, 
+          prevOutputType);
+    }
+  }
+  
+  private LogicalNode insertOuterScan(BinaryNode parent, String tableId,
+      TableMeta meta) throws IOException {
+    TableDesc desc = CatalogUtil.newTableDesc(tableId, meta, sm.getTablePath(tableId));
+    ScanNode scan = new ScanNode(new FromTable(desc));
+    scan.setLocal(true);
+    scan.setInSchema(meta.getSchema());
+    scan.setOutSchema(meta.getSchema());
+    parent.setOuter(scan);
+    return parent;
+  }
+  
+  private LogicalNode insertInnerScan(BinaryNode parent, String tableId, 
+      TableMeta meta) throws IOException {
+    TableDesc desc = CatalogUtil.newTableDesc(tableId, meta, sm.getTablePath(tableId));
+    ScanNode scan = new ScanNode(new FromTable(desc));
+    scan.setLocal(true);
+    scan.setInSchema(meta.getSchema());
+    scan.setOutSchema(meta.getSchema());
+    parent.setInner(scan);
+    return parent;
+  }
+  
+  private MasterPlan convertToGlobalPlan(IndexWriteNode index,
+                                         LogicalNode logicalPlan) throws IOException {
+    recursiveBuildSubQuery(logicalPlan);
+    ExecutionBlock root;
+    
+    if (index != null) {
+      SubQueryId id = QueryIdFactory.newSubQueryId(queryId);
+      ExecutionBlock unit = new ExecutionBlock(id);
+      root = makeScanSubQuery(unit);
+      root.setPlan(index);
+    } else {
+      root = convertMap.get(((LogicalRootNode)logicalPlan).getSubNode());
+      root.getStoreTableNode().setLocal(false);
+    }
+    return new MasterPlan(root);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlannerUtils.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlannerUtils.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlannerUtils.java
new file mode 100644
index 0000000..76c5afd
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlannerUtils.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.parser.QueryBlock;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map;
+
+public class GlobalPlannerUtils {
+  private static Log LOG = LogFactory.getLog(GlobalPlannerUtils.class);
+
+  static class WorkerComparatorByNumOfQueryUnits implements Comparator {
+    Map base;
+    public WorkerComparatorByNumOfQueryUnits(Map base) {
+      this.base = base;
+    }
+    public int compare(Object a, Object b) {
+      Collection<QueryUnit> l1 = (Collection<QueryUnit>)base.get(a);
+      Collection<QueryUnit> l2 = (Collection<QueryUnit>)base.get(b);
+      if (l1.size() > l2.size()) {
+        return 1;
+      } else {
+        return -1;
+      }
+    }
+  }
+
+ /* public static QueryUnit[] buildQueryDistributionPlan(
+      Map<Fragment, FragmentServingInfo> servingMap,
+      Map<String, List<String>> DNSNameToHostsMap,
+//      Set<String> failedHost,
+      QueryUnit[] queryUnits
+  ) throws UnknownWorkerException {
+    Map<String, Collection<QueryUnit>> map = Maps.newHashMap();
+    ListMultimap<String, QueryUnit> distStatus =
+        Multimaps.newListMultimap(map,
+            new Supplier<List<QueryUnit>>() {
+              @Override
+              public List<QueryUnit> get() {
+                return Lists.newArrayList();
+              }
+            });
+
+    String host;
+    Fragment fragment;
+    FragmentServingInfo servingInfo = null;
+    // build the initial query distribution status
+    for (QueryUnit unit : queryUnits) {
+      Preconditions.checkState(unit.getScanNodes().length == 1);
+      fragment = unit.getFragment(unit.getScanNodes()[0].getTableId());
+      if (servingMap.containsKey(fragment)) {
+        servingInfo = servingMap.get(fragment);
+      } else {
+        servingInfo = null;
+        // error
+      }
+      host = servingInfo.getPrimaryHost();
+      distStatus.put(host, unit);
+    }
+
+    *//*LOG.info("===== before re-balancing =====");
+    for (Map.Entry<String, Collection<QueryUnit>> e : map.entrySet()) {
+      LOG.info(e.getKey() + " : " + e.getValue().size());
+    }
+    LOG.info("\n");*//*
+
+    // re-balancing the query distribution
+    Preconditions.checkState(queryUnits.length >= servingMap.size());
+    int threshold = 0;
+    int mean = queryUnits.length / map.size();
+    int maxQueryUnitNum = mean + threshold;
+    WorkerComparatorByNumOfQueryUnits comp =
+        new WorkerComparatorByNumOfQueryUnits(map);
+    TreeMap<String, Collection<QueryUnit>> sortedMap =
+        Maps.newTreeMap(comp);
+    sortedMap.putAll(map);
+
+    Collection<QueryUnit> fromUnits;
+    Collection<QueryUnit> toUnits;
+    QueryUnit moved;
+    int moveNum = 0;
+    List<Map.Entry<String, Collection<QueryUnit>>> list =
+        Lists.newArrayList(sortedMap.entrySet());
+    int fromIdx = list.size()-1, toIdx = 0;
+    while (fromIdx > toIdx) {
+      toUnits = list.get(toIdx).getValue();
+      fromUnits = list.get(fromIdx).getValue();
+
+      do{
+        moved = fromUnits.iterator().next();
+        toUnits.add(moved);
+        fromUnits.remove(moved);
+        moveNum++;
+      } while (toUnits.size() < maxQueryUnitNum &&
+          fromUnits.size() > maxQueryUnitNum);
+      if (fromUnits.size() <= maxQueryUnitNum) {
+        fromIdx--;
+      }
+      if (toUnits.size() >= maxQueryUnitNum) {
+        toIdx++;
+      }
+    }
+
+    *//*LOG.info("===== after re-balancing " + maxQueryUnitNum + " =====");
+    for (Map.Entry<String, Collection<QueryUnit>> e : list) {
+      LOG.info(e.getKey() + " : " + e.getValue().size());
+    }
+    LOG.info("\n");*//*
+
+    LOG.info(moveNum + " query units among " +
+        queryUnits.length + " are moved!");
+
+    List<String> hosts;
+    int rrIdx = 0;
+    for (Map.Entry<String, Collection<QueryUnit>> e : list) {
+      hosts = DNSNameToHostsMap.get(e.getKey());
+      if (hosts == null) {
+        throw new UnknownWorkerException(e.getKey() + "");
+      }
+      for (QueryUnit unit : e.getValue()) {
+*//*
+        while (failedHost.contains(hosts.get(rrIdx))) {
+          if (++rrIdx == hosts.size()) {
+            rrIdx = 0;
+          }
+        }
+*//*
+        unit.setHost(hosts.get(rrIdx++));
+        if (rrIdx == hosts.size()) {
+          rrIdx = 0;
+        }
+      }
+    }
+
+    return queryUnits;
+  }*/
+
+  public static StoreTableNode newStorePlan(Schema outputSchema,
+                                            String outputTableId) {
+    StoreTableNode store = new StoreTableNode(outputTableId);
+    store.setInSchema(outputSchema);
+    store.setOutSchema(outputSchema);
+    return store;
+  }
+
+  public static ScanNode newScanPlan(Schema inputSchema,
+                                     String inputTableId,
+                                     Path inputPath) {
+    TableMeta meta = CatalogUtil.newTableMeta(inputSchema, StoreType.CSV);
+    TableDesc desc = CatalogUtil.newTableDesc(inputTableId, meta, inputPath);
+    ScanNode newScan = new ScanNode(new QueryBlock.FromTable(desc));
+    newScan.setInSchema(desc.getMeta().getSchema());
+    newScan.setOutSchema(desc.getMeta().getSchema());
+    return newScan;
+  }
+
+  public static GroupbyNode newGroupbyPlan(Schema inputSchema,
+                                           Schema outputSchema,
+                                           Column[] keys,
+                                           EvalNode havingCondition,
+                                           QueryBlock.Target[] targets) {
+    GroupbyNode groupby = new GroupbyNode(keys);
+    groupby.setInSchema(inputSchema);
+    groupby.setOutSchema(outputSchema);
+    groupby.setHavingCondition(havingCondition);
+    groupby.setTargetList(targets);
+
+    return groupby;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/Query.java
new file mode 100644
index 0000000..197ff11
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/Query.java
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.SubQueryId;
+import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.ExprType;
+import org.apache.tajo.engine.planner.logical.IndexWriteNode;
+import org.apache.tajo.master.QueryMaster.QueryContext;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.util.IndexUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class Query implements EventHandler<QueryEvent> {
+  private static final Log LOG = LogFactory.getLog(Query.class);
+
+
+  // Facilities for Query
+  private final QueryConf conf;
+  private final Clock clock;
+  private String queryStr;
+  private Map<SubQueryId, SubQuery> subqueries;
+  private final EventHandler eventHandler;
+  private final MasterPlan plan;
+  private final StorageManager sm;
+  private QueryContext context;
+  private ExecutionBlockCursor cursor;
+
+  // Query Status
+  private final QueryId id;
+  private long appSubmitTime;
+  private long startTime;
+  private long initializationTime;
+  private long finishTime;
+  private TableDesc resultDesc;
+  private int completedSubQueryCount = 0;
+  private final List<String> diagnostics = new ArrayList<String>();
+
+  // Internal Variables
+  private final Lock readLock;
+  private final Lock writeLock;
+  private int priority = 100;
+
+  // State Machine
+  private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine;
+
+  private static final StateMachineFactory
+      <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
+      new StateMachineFactory<Query, QueryState, QueryEventType, QueryEvent>
+          (QueryState.QUERY_NEW)
+
+      .addTransition(QueryState.QUERY_NEW,
+          EnumSet.of(QueryState.QUERY_INIT, QueryState.QUERY_FAILED),
+          QueryEventType.INIT, new InitTransition())
+
+      .addTransition(QueryState.QUERY_INIT, QueryState.QUERY_RUNNING,
+          QueryEventType.START, new StartTransition())
+
+      .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+          QueryEventType.INIT_COMPLETED, new InitCompleteTransition())
+      .addTransition(QueryState.QUERY_RUNNING,
+          EnumSet.of(QueryState.QUERY_RUNNING, QueryState.QUERY_SUCCEEDED,
+              QueryState.QUERY_FAILED),
+          QueryEventType.SUBQUERY_COMPLETED,
+          new SubQueryCompletedTransition())
+      .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
+          QueryEventType.INTERNAL_ERROR, new InternalErrorTransition())
+       .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+          QueryEventType.INTERNAL_ERROR)
+
+      .installTopology();
+
+  public Query(final QueryContext context, final QueryId id, Clock clock,
+               final long appSubmitTime,
+               final String queryStr,
+               final EventHandler eventHandler,
+               final MasterPlan plan, final StorageManager sm) {
+    this.context = context;
+    this.conf = context.getConf();
+    this.id = id;
+    this.clock = clock;
+    this.appSubmitTime = appSubmitTime;
+    this.queryStr = queryStr;
+    subqueries = Maps.newHashMap();
+    this.eventHandler = eventHandler;
+    this.plan = plan;
+    this.sm = sm;
+    cursor = new ExecutionBlockCursor(plan);
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+
+    stateMachine = stateMachineFactory.make(this);
+  }
+
+  public boolean isCreateTableStmt() {
+    return context.isCreateTableQuery();
+  }
+
+  protected FileSystem getFileSystem(Configuration conf) throws IOException {
+    return FileSystem.get(conf);
+  }
+
+  public float getProgress() {
+    QueryState state = getStateMachine().getCurrentState();
+    if (state == QueryState.QUERY_SUCCEEDED) {
+      return 1.0f;
+    } else {
+      int idx = 0;
+      float [] subProgresses = new float[subqueries.size()];
+      boolean finished = true;
+      for (SubQuery subquery: subqueries.values()) {
+        if (subquery.getState() != SubQueryState.NEW) {
+          subProgresses[idx] = subquery.getProgress();
+          if (finished && subquery.getState() != SubQueryState.SUCCEEDED) {
+            finished = false;
+          }
+        } else {
+          subProgresses[idx] = 0.0f;
+        }
+        idx++;
+      }
+
+      if (finished) {
+        return 1.0f;
+      }
+
+      float totalProgress = 0;
+      float proportion = 1.0f / (float)subqueries.size();
+
+      for (int i = 0; i < subProgresses.length; i++) {
+        totalProgress += subProgresses[i] * proportion;
+      }
+
+      return totalProgress;
+    }
+  }
+
+  public long getAppSubmitTime() {
+    return this.appSubmitTime;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime() {
+    startTime = clock.getTime();
+  }
+
+  public long getInitializationTime() {
+    return initializationTime;
+  }
+
+  public void setInitializationTime() {
+    initializationTime = clock.getTime();
+  }
+
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public void setFinishTime() {
+    finishTime = clock.getTime();
+  }
+
+  public List<String> getDiagnostics() {
+    readLock.lock();
+    try {
+      return diagnostics;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  protected void addDiagnostic(String diag) {
+    diagnostics.add(diag);
+  }
+
+  public TableDesc getResultDesc() {
+    return resultDesc;
+  }
+
+  public void setResultDesc(TableDesc desc) {
+    resultDesc = desc;
+  }
+
+  public MasterPlan getPlan() {
+    return plan;
+  }
+
+  public StateMachine<QueryState, QueryEventType, QueryEvent> getStateMachine() {
+    return stateMachine;
+  }
+  
+  public void addSubQuery(SubQuery subquery) {
+    subqueries.put(subquery.getId(), subquery);
+  }
+  
+  public QueryId getId() {
+    return this.id;
+  }
+  
+  public SubQuery getSubQuery(SubQueryId id) {
+    return this.subqueries.get(id);
+  }
+
+  public QueryState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public ExecutionBlockCursor getExecutionBlockCursor() {
+    return cursor;
+  }
+
+  static class InitTransition
+      implements MultipleArcTransition<Query, QueryEvent, QueryState> {
+
+    @Override
+    public QueryState transition(Query query, QueryEvent queryEvent) {
+      query.setStartTime();
+      return QueryState.QUERY_INIT;
+    }
+  }
+
+  public static class StartTransition
+      implements SingleArcTransition<Query, QueryEvent> {
+
+    @Override
+    public void transition(Query query, QueryEvent queryEvent) {
+      SubQuery subQuery = new SubQuery(query.context, query.getExecutionBlockCursor().nextBlock(),
+          query.sm);
+      subQuery.setPriority(query.priority--);
+      query.addSubQuery(subQuery);
+      LOG.info("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
+      subQuery.handle(new SubQueryEvent(subQuery.getId(),
+          SubQueryEventType.SQ_INIT));
+    }
+  }
+
+  public static class SubQueryCompletedTransition implements
+      MultipleArcTransition<Query, QueryEvent, QueryState> {
+
+    @Override
+    public QueryState transition(Query query, QueryEvent event) {
+      // increase the count for completed subqueries
+      query.completedSubQueryCount++;
+      SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
+      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+
+      // if the subquery is succeeded
+      if (castEvent.getFinalState() == SubQueryState.SUCCEEDED) {
+        if (cursor.hasNext()) {
+          SubQuery nextSubQuery = new SubQuery(query.context, cursor.nextBlock(), query.sm);
+          nextSubQuery.setPriority(query.priority--);
+          query.addSubQuery(nextSubQuery);
+          nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(),
+              SubQueryEventType.SQ_INIT));
+          LOG.info("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
+          LOG.info("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+          return query.checkQueryForCompleted();
+
+        } else { // Finish a query
+          if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
+            SubQuery subQuery = query.getSubQuery(castEvent.getSubQueryId());
+            TableDesc desc = new TableDescImpl(query.conf.getOutputTable(),
+                subQuery.getTableMeta(), query.context.getOutputPath());
+            query.setResultDesc(desc);
+            try {
+              query.writeStat(query.context.getOutputPath(), subQuery);
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+            query.eventHandler.handle(new QueryFinishEvent(query.getId()));
+
+            if (query.context.isCreateTableQuery()) {
+              query.context.getCatalog().addTable(desc);
+            }
+          }
+
+          return query.finished(QueryState.QUERY_SUCCEEDED);
+        }
+      } else {
+        // if at least one subquery is failed, the query is also failed.
+        return QueryState.QUERY_FAILED;
+      }
+    }
+  }
+
+  private static class DiagnosticsUpdateTransition implements
+      SingleArcTransition<Query, QueryEvent> {
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event)
+          .getDiagnosticUpdate());
+    }
+  }
+
+  private static class InitCompleteTransition implements
+      SingleArcTransition<Query, QueryEvent> {
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      if (query.initializationTime == 0) {
+        query.setInitializationTime();
+      }
+    }
+  }
+
+  private static class InternalErrorTransition
+      implements SingleArcTransition<Query, QueryEvent> {
+
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      query.finished(QueryState.QUERY_ERROR);
+    }
+  }
+
+  public QueryState finished(QueryState finalState) {
+    setFinishTime();
+    return finalState;
+  }
+
+  /**
+   * Check if all subqueries of the query are completed
+   * @return QueryState.QUERY_SUCCEEDED if all subqueries are completed.
+   */
+  QueryState checkQueryForCompleted() {
+    if (completedSubQueryCount == subqueries.size()) {
+      return QueryState.QUERY_SUCCEEDED;
+    }
+    return getState();
+  }
+
+
+  @Override
+  public void handle(QueryEvent event) {
+    LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
+    try {
+      writeLock.lock();
+      QueryState oldState = getState();
+      try {
+        getStateMachine().doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        eventHandler.handle(new QueryEvent(this.id,
+            QueryEventType.INTERNAL_ERROR));
+      }
+
+      //notify the eventhandler of state change
+      if (oldState != getState()) {
+        LOG.info(id + " Query Transitioned from " + oldState + " to "
+            + getState());
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  private void writeStat(Path outputPath, SubQuery subQuery)
+      throws IOException {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    if (execBlock.getPlan().getType() == ExprType.CREATE_INDEX) {
+      IndexWriteNode index = (IndexWriteNode) execBlock.getPlan();
+      Path indexPath = new Path(sm.getTablePath(index.getTableName()), "index");
+      TableMeta meta;
+      if (sm.getFileSystem().exists(new Path(indexPath, ".meta"))) {
+        meta = sm.getTableMeta(indexPath);
+      } else {
+        meta = CatalogUtil
+            .newTableMeta(execBlock.getOutputSchema(), StoreType.CSV);
+      }
+      String indexName = IndexUtil.getIndexName(index.getTableName(),
+          index.getSortSpecs());
+      String json = GsonCreator.getInstance().toJson(index.getSortSpecs());
+      meta.putOption(indexName, json);
+
+      sm.writeTableMeta(indexPath, meta);
+
+    } else {
+      sm.writeTableMeta(outputPath, subQuery.getTableMeta());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMaster.java
new file mode 100644
index 0000000..f4dc455
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryMaster.java
@@ -0,0 +1,465 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.tajo.*;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.master.TaskRunnerLauncherImpl.ContainerProxy;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.rm.RMContainerAllocator;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.TajoIdUtils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class QueryMaster extends CompositeService implements EventHandler {
+  private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
+
+  // Master Context
+  private final MasterContext masterContext;
+
+  // AppMaster Common
+  private final Clock clock;
+  private final long appSubmitTime;
+  private String appName;
+  private final ApplicationAttemptId appAttemptID;
+
+  // For Query
+  private final QueryId queryId;
+  private QueryContext queryContext;
+  private Query query;
+  private MasterPlan masterPlan;
+
+  private AsyncDispatcher dispatcher;
+  private YarnRPC rpc;
+  private RMContainerAllocator rmAllocator;
+  private TaskRunnerListener taskRunnerListener;
+  private TaskRunnerLauncher taskRunnerLauncher;
+
+  // Services of Tajo
+  private CatalogService catalog;
+
+  private boolean isCreateTableStmt;
+  private StorageManager storageManager;
+  private FileSystem defaultFS;
+  private Path outputPath;
+
+  public QueryMaster(final MasterContext masterContext,
+                     final ApplicationAttemptId appAttemptID,
+                     final Clock clock, long appSubmitTime,
+                     MasterPlan masterPlan) {
+    super(QueryMaster.class.getName());
+    this.masterContext = masterContext;
+
+    this.appAttemptID = appAttemptID;
+    this.clock = clock;
+    this.appSubmitTime = appSubmitTime;
+
+    this.queryId = TajoIdUtils.createQueryId(appAttemptID);
+    this.masterPlan = masterPlan;
+    LOG.info("Created Query Master for " + appAttemptID);
+  }
+
+  public void init(Configuration _conf) {
+    QueryConf conf = new QueryConf(_conf);
+
+    try {
+      queryContext = new QueryContext(conf);
+
+      dispatcher = masterContext.getDispatcher();
+      // TODO - This comment should be eliminated when QueryMaster is separated.
+      dispatcher = new AsyncDispatcher();
+      addIfService(dispatcher);
+
+      // TODO - This comment should be improved when QueryMaster is separated.
+      rpc = masterContext.getYarnRPC();
+
+      catalog = masterContext.getCatalog();
+      storageManager = masterContext.getStorageManager();
+
+      taskRunnerListener = new TaskRunnerListener(queryContext);
+      addIfService(taskRunnerListener);
+
+      rmAllocator = new RMContainerAllocator(queryContext);
+      addIfService(rmAllocator);
+      dispatcher.register(ContainerAllocatorEventType.class, rmAllocator);
+
+      query = new Query(queryContext, queryId, clock, appSubmitTime,
+          "", dispatcher.getEventHandler(), masterPlan, storageManager);
+      initStagingDir();
+
+      // QueryEventDispatcher is already registered in TajoMaster
+      dispatcher.register(QueryEventType.class, query);
+      dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher());
+      dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+      dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
+      dispatcher.register(QueryFinishEvent.EventType.class, new QueryFinishEventHandler());
+      dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
+
+      taskRunnerLauncher = new TaskRunnerLauncherImpl(queryContext);
+      addIfService(taskRunnerLauncher);
+      dispatcher.register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
+
+
+    } catch (Throwable t) {
+      LOG.error(ExceptionUtils.getStackTrace(t));
+      throw new RuntimeException(t);
+    }
+
+    super.init(conf);
+  }
+
+  public void start() {
+    super.start();
+    startQuery();
+  }
+
+  public void stop() {
+    super.stop();
+  }
+
+  protected void addIfService(Object object) {
+    if (object instanceof Service) {
+      addService((Service) object);
+    }
+  }
+
+  public void startQuery() {
+    dispatcher.getEventHandler().handle(new QueryEvent(queryId,
+        QueryEventType.INIT));
+    dispatcher.getEventHandler().handle(new QueryEvent(queryId,
+        QueryEventType.START));
+  }
+
+  @Override
+  public void handle(Event event) {
+    dispatcher.getEventHandler().handle(event);
+  }
+
+  public EventHandler getEventHandler() {
+    return dispatcher.getEventHandler();
+  }
+
+  private class SubQueryEventDispatcher implements EventHandler<SubQueryEvent> {
+    public void handle(SubQueryEvent event) {
+      SubQueryId id = event.getSubQueryId();
+      query.getSubQuery(id).handle(event);
+    }
+  }
+
+  private class TaskEventDispatcher
+      implements EventHandler<TaskEvent> {
+    public void handle(TaskEvent event) {
+      QueryUnitId taskId = event.getTaskId();
+      QueryUnit task = query.getSubQuery(taskId.getSubQueryId()).
+          getQueryUnit(taskId);
+      task.handle(event);
+    }
+  }
+
+  private class TaskAttemptEventDispatcher
+      implements EventHandler<TaskAttemptEvent> {
+    public void handle(TaskAttemptEvent event) {
+      QueryUnitAttemptId attemptId = event.getTaskAttemptId();
+      SubQuery subQuery = query.getSubQuery(attemptId.getSubQueryId());
+      QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
+      QueryUnitAttempt attempt = task.getAttempt(attemptId);
+      attempt.handle(event);
+    }
+  }
+
+  private class TaskSchedulerDispatcher
+      implements EventHandler<TaskSchedulerEvent> {
+    public void handle(TaskSchedulerEvent event) {
+      SubQuery subQuery = query.getSubQuery(event.getSubQueryId());
+      subQuery.getTaskScheduler().handle(event);
+    }
+  }
+
+  public QueryContext getContext() {
+    return this.queryContext;
+  }
+
+  public class QueryContext {
+    private QueryConf conf;
+    public Map<ContainerId, ContainerProxy> containers = new ConcurrentHashMap<ContainerId, ContainerProxy>();
+    int minCapability;
+    int maxCapability;
+    int numCluster;
+
+    public QueryContext(QueryConf conf) {
+      this.conf = conf;
+    }
+
+    public QueryConf getConf() {
+      return conf;
+    }
+
+    public AsyncDispatcher getDispatcher() {
+      return dispatcher;
+    }
+
+    public Clock getClock() {
+      return clock;
+    }
+
+    public Query getQuery() {
+      return query;
+    }
+
+    public SubQuery getSubQuery(SubQueryId subQueryId) {
+      return query.getSubQuery(subQueryId);
+    }
+
+    public QueryId getQueryId() {
+      return queryId;
+    }
+
+    public ApplicationId getApplicationId() {
+      return appAttemptID.getApplicationId();
+    }
+
+    public ApplicationAttemptId getApplicationAttemptId() {
+      return appAttemptID;
+    }
+
+    public EventHandler getEventHandler() {
+      return dispatcher.getEventHandler();
+    }
+
+    public YarnRPC getYarnRPC() {
+      return rpc;
+    }
+
+    public InetSocketAddress getRpcAddress() {
+      return masterContext.getClientService().getBindAddress();
+    }
+
+    public InetSocketAddress getTaskListener() {
+      return taskRunnerListener.getBindAddress();
+    }
+
+    public void addContainer(ContainerId cId, ContainerProxy container) {
+      containers.put(cId, container);
+    }
+
+    public void removeContainer(ContainerId cId) {
+      containers.remove(cId);
+    }
+
+    public boolean containsContainer(ContainerId cId) {
+      return containers.containsKey(cId);
+    }
+
+    public ContainerProxy getContainer(ContainerId cId) {
+      return containers.get(cId);
+    }
+
+    public int getNumClusterNode() {
+      return numCluster;
+    }
+
+    public void setNumClusterNodes(int num) {
+      numCluster = num;
+    }
+
+    public CatalogService getCatalog() {
+      return catalog;
+    }
+
+    public Path getOutputPath() {
+      return outputPath;
+    }
+
+    public void setMaxContainerCapability(int capability) {
+      this.maxCapability = capability;
+    }
+
+    public int getMaxContainerCapability() {
+      return this.maxCapability;
+    }
+
+    public void setMinContainerCapability(int capability) {
+      this.minCapability = capability;
+    }
+
+    public int getMinContainerCapability() {
+      return this.minCapability;
+    }
+
+    public boolean isCreateTableQuery() {
+      return isCreateTableStmt;
+    }
+
+    public float getProgress() {
+      return query.getProgress();
+    }
+
+    public long getStartTime() {
+      return query.getStartTime();
+    }
+
+    public long getFinishTime() {
+      return query.getFinishTime();
+    }
+
+    public StorageManager getStorageManager() {
+      return storageManager;
+    }
+  }
+
+  private class QueryFinishEventHandler implements EventHandler<QueryFinishEvent> {
+    @Override
+    public void handle(QueryFinishEvent event) {
+      LOG.info("Query end notification started for QueryId : " + query.getId());
+
+      try {
+        // Stop all services
+        // This will also send the final report to the ResourceManager
+        LOG.info("Calling stop for all the services");
+        stop();
+
+      } catch (Throwable t) {
+        LOG.warn("Graceful stop failed ", t);
+      }
+
+      //Bring the process down by force.
+      //Not needed after HADOOP-7140
+      LOG.info("Exiting QueryMaster..GoodBye!");
+      // TODO - to be enabled if query master is separated.
+      //System.exit(0);
+    }
+  }
+
+  // query submission directory is private!
+  final public static FsPermission USER_DIR_PERMISSION =
+      FsPermission.createImmutable((short) 0700); // rwx--------
+
+  /**
+   * It initializes the final output and staging directory and sets
+   * them to variables.
+   */
+  private void initStagingDir() throws IOException {
+    QueryConf conf = getContext().getConf();
+
+    String realUser;
+    String currentUser;
+    UserGroupInformation ugi;
+    ugi = UserGroupInformation.getLoginUser();
+    realUser = ugi.getShortUserName();
+    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+
+    String givenOutputTableName = conf.getOutputTable();
+    Path stagingDir;
+
+    // If final output directory is not given by an user,
+    // we use the query id as a output directory.
+    if (givenOutputTableName.equals("")) {
+      this.isCreateTableStmt = false;
+      FileSystem defaultFS = FileSystem.get(conf);
+
+      Path homeDirectory = defaultFS.getHomeDirectory();
+      if (!defaultFS.exists(homeDirectory)) {
+        defaultFS.mkdirs(homeDirectory, new FsPermission(USER_DIR_PERMISSION));
+      }
+
+      Path userQueryDir = new Path(homeDirectory, TajoConstants.USER_QUERYDIR_PREFIX);
+
+      if (defaultFS.exists(userQueryDir)) {
+        FileStatus fsStatus = defaultFS.getFileStatus(userQueryDir);
+        String owner = fsStatus.getOwner();
+
+        if (!(owner.equals(currentUser) || owner.equals(realUser))) {
+          throw new IOException("The ownership on the user's query " +
+              "directory " + userQueryDir + " is not as expected. " +
+              "It is owned by " + owner + ". The directory must " +
+              "be owned by the submitter " + currentUser + " or " +
+              "by " + realUser);
+        }
+
+        if (!fsStatus.getPermission().equals(USER_DIR_PERMISSION)) {
+          LOG.info("Permissions on staging directory " + userQueryDir + " are " +
+              "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+              "to correct value " + USER_DIR_PERMISSION);
+          defaultFS.setPermission(userQueryDir, new FsPermission(USER_DIR_PERMISSION));
+        }
+      } else {
+        defaultFS.mkdirs(userQueryDir,
+            new FsPermission(USER_DIR_PERMISSION));
+      }
+
+      stagingDir = StorageUtil.concatPath(userQueryDir, queryId.toString());
+
+      if (defaultFS.exists(stagingDir)) {
+        throw new IOException("The staging directory " + stagingDir
+            + "already exists. The directory must be unique to each query");
+      } else {
+        defaultFS.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
+      }
+
+      // Set the query id to the output table name
+      conf.setOutputTable(queryId.toString());
+
+    } else {
+      this.isCreateTableStmt = true;
+      Path warehouseDir = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR),
+          TajoConstants.WAREHOUSE_DIR);
+      stagingDir = new Path(warehouseDir, conf.getOutputTable());
+
+      FileSystem fs = warehouseDir.getFileSystem(conf);
+      if (fs.exists(stagingDir)) {
+        throw new IOException("The staging directory " + stagingDir
+            + " already exists. The directory must be unique to each query");
+      } else {
+        // TODO - should have appropriate permission
+        fs.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
+      }
+    }
+
+    conf.setOutputPath(stagingDir);
+    outputPath = stagingDir;
+    LOG.info("Initialized Query Staging Dir: " + outputPath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryUnit.java
new file mode 100644
index 0000000..8489e6c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryUnit.java
@@ -0,0 +1,502 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.engine.MasterWorkerProtos.Partition;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.storage.Fragment;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class QueryUnit implements EventHandler<TaskEvent> {
+  /** Class Logger */
+  private static final Log LOG = LogFactory.getLog(QueryUnit.class);
+
+	private QueryUnitId taskId;
+  private EventHandler eventHandler;
+	private StoreTableNode store = null;
+	private LogicalNode plan = null;
+	private List<ScanNode> scan;
+	
+	private Map<String, Fragment> fragMap;
+	private Map<String, Set<URI>> fetchMap;
+	
+  private List<Partition> partitions;
+	private TableStat stats;
+  private String [] dataLocations;
+  private final boolean isLeafTask;
+  private List<IntermediateEntry> intermediateData;
+
+  private Map<QueryUnitAttemptId, QueryUnitAttempt> attempts;
+  private final int maxAttempts = 3;
+  private Integer lastAttemptId;
+
+  private QueryUnitAttemptId successfulAttempt;
+  private String succeededHost;
+  private int succeededPullServerPort;
+
+  private int failedAttempts;
+  private int finishedAttempts; // finish are total of success, failed and killed
+
+  private static final StateMachineFactory
+      <QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
+      new StateMachineFactory
+          <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
+
+      .addTransition(TaskState.NEW, TaskState.SCHEDULED,
+          TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
+
+       .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
+           TaskEventType.T_ATTEMPT_LAUNCHED)
+
+        .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+           TaskEventType.T_ATTEMPT_LAUNCHED)
+
+       .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
+           TaskEventType.T_ATTEMPT_SUCCEEDED, new AttemptSucceededTransition())
+
+       .addTransition(TaskState.RUNNING,
+            EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
+            TaskEventType.T_ATTEMPT_FAILED, new AttemptFailedTransition())
+
+
+
+      .installTopology();
+  private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
+
+
+  private final Lock readLock;
+  private final Lock writeLock;
+
+	public QueryUnit(QueryUnitId id, boolean isLeafTask, EventHandler eventHandler) {
+		this.taskId = id;
+    this.eventHandler = eventHandler;
+    this.isLeafTask = isLeafTask;
+		scan = new ArrayList<ScanNode>();
+    fetchMap = Maps.newHashMap();
+    fragMap = Maps.newHashMap();
+    partitions = new ArrayList<Partition>();
+    attempts = Collections.emptyMap();
+    lastAttemptId = -1;
+    failedAttempts = 0;
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+
+    stateMachine = stateMachineFactory.make(this);
+	}
+
+  public boolean isLeafTask() {
+    return this.isLeafTask;
+  }
+
+  public void setDataLocations(String [] dataLocations) {
+    this.dataLocations = dataLocations;
+  }
+
+  public String [] getDataLocations() {
+    return this.dataLocations;
+  }
+
+  public TaskState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+	
+	public void setLogicalPlan(LogicalNode plan) {
+    Preconditions.checkArgument(plan.getType() == ExprType.STORE ||
+        plan.getType() == ExprType.CREATE_INDEX);
+    
+	  this.plan = plan;
+	  if (plan instanceof StoreTableNode) {
+      store = (StoreTableNode) plan;      
+    } else {
+      store = (StoreTableNode) ((IndexWriteNode)plan).getSubNode();
+    }
+	  LogicalNode node = plan;
+	  ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
+	  s.add(node);
+	  while (!s.isEmpty()) {
+	    node = s.remove(s.size()-1);
+	    if (node instanceof UnaryNode) {
+	      UnaryNode unary = (UnaryNode) node;
+	      s.add(s.size(), unary.getSubNode());
+	    } else if (node instanceof BinaryNode) {
+	      BinaryNode binary = (BinaryNode) node;
+	      s.add(s.size(), binary.getOuterNode());
+	      s.add(s.size(), binary.getInnerNode());
+	    } else if (node instanceof ScanNode) {
+	      scan.add((ScanNode)node);
+	    }
+	  }
+	}
+
+  @Deprecated
+  public void setFragment(String tableId, Fragment fragment) {
+    this.fragMap.put(tableId, fragment);
+    if (fragment.hasDataLocations()) {
+      setDataLocations(fragment.getDataLocations());
+    }
+  }
+
+  public void setFragment2(Fragment fragment) {
+    this.fragMap.put(fragment.getId(), fragment);
+    if (fragment.hasDataLocations()) {
+      setDataLocations(fragment.getDataLocations());
+    }
+  }
+	
+	public void addFetch(String tableId, String uri) throws URISyntaxException {
+	  this.addFetch(tableId, new URI(uri));
+	}
+	
+	public void addFetch(String tableId, URI uri) {
+	  Set<URI> uris;
+	  if (fetchMap.containsKey(tableId)) {
+	    uris = fetchMap.get(tableId);
+	  } else {
+	    uris = Sets.newHashSet();
+	  }
+	  uris.add(uri);
+    fetchMap.put(tableId, uris);
+	}
+	
+	public void addFetches(String tableId, Collection<URI> urilist) {
+	  Set<URI> uris;
+    if (fetchMap.containsKey(tableId)) {
+      uris = fetchMap.get(tableId);
+    } else {
+      uris = Sets.newHashSet();
+    }
+    uris.addAll(urilist);
+    fetchMap.put(tableId, uris);
+	}
+	
+	public void setFetches(Map<String, Set<URI>> fetches) {
+	  this.fetchMap.clear();
+	  this.fetchMap.putAll(fetches);
+	}
+	
+  public Fragment getFragment(String tableId) {
+    return this.fragMap.get(tableId);
+  }
+
+  public Collection<Fragment> getAllFragments() {
+    return fragMap.values();
+  }
+	
+	public LogicalNode getLogicalPlan() {
+	  return this.plan;
+	}
+	
+	public QueryUnitId getId() {
+		return taskId;
+	}
+	
+	public Collection<URI> getFetchHosts(String tableId) {
+	  return fetchMap.get(tableId);
+	}
+	
+	public Collection<Set<URI>> getFetches() {
+	  return fetchMap.values();
+	}
+	
+	public Collection<URI> getFetch(ScanNode scan) {
+	  return this.fetchMap.get(scan.getTableId());
+	}
+
+	public String getOutputName() {
+		return this.store.getTableName();
+	}
+	
+	public Schema getOutputSchema() {
+	  return this.store.getOutSchema();
+	}
+	
+	public StoreTableNode getStoreTableNode() {
+	  return this.store;
+	}
+	
+	public ScanNode[] getScanNodes() {
+	  return this.scan.toArray(new ScanNode[scan.size()]);
+	}
+	
+	@Override
+	public String toString() {
+		String str = new String(plan.getType() + " \n");
+		for (Entry<String, Fragment> e : fragMap.entrySet()) {
+		  str += e.getKey() + " : ";
+      str += e.getValue() + " ";
+		}
+		for (Entry<String, Set<URI>> e : fetchMap.entrySet()) {
+      str += e.getKey() + " : ";
+      for (URI t : e.getValue()) {
+        str += t + " ";
+      }
+    }
+		
+		return str;
+	}
+	
+	public void setStats(TableStat stats) {
+	  this.stats = stats;
+	}
+	
+	public void setPartitions(List<Partition> partitions) {
+	  this.partitions = Collections.unmodifiableList(partitions);
+	}
+	
+	public TableStat getStats() {
+	  return this.stats;
+	}
+	
+	public List<Partition> getPartitions() {
+	  return this.partitions;
+	}
+	
+	public int getPartitionNum() {
+	  return this.partitions.size();
+	}
+
+  public QueryUnitAttempt newAttempt() {
+    QueryUnitAttempt attempt = new QueryUnitAttempt(
+        QueryIdFactory.newQueryUnitAttemptId(this.getId(),
+            ++lastAttemptId), this, eventHandler);
+    return attempt;
+  }
+
+  public QueryUnitAttempt getAttempt(QueryUnitAttemptId attemptId) {
+    return attempts.get(attemptId);
+  }
+
+  public QueryUnitAttempt getAttempt(int attempt) {
+    return this.attempts.get(new QueryUnitAttemptId(this.getId(), attempt));
+  }
+
+  public QueryUnitAttempt getLastAttempt() {
+    return this.attempts.get(this.lastAttemptId);
+  }
+
+  protected QueryUnitAttempt getSuccessfulAttempt() {
+    readLock.lock();
+    try {
+      if (null == successfulAttempt) {
+        return null;
+      }
+      return attempts.get(successfulAttempt);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public int getRetryCount () {
+    return this.lastAttemptId;
+  }
+
+  private static class InitialScheduleTransition implements
+    SingleArcTransition<QueryUnit, TaskEvent> {
+
+    @Override
+    public void transition(QueryUnit task, TaskEvent taskEvent) {
+      task.addAndScheduleAttempt();
+    }
+  }
+
+  // This is always called in the Write Lock
+  private void addAndScheduleAttempt() {
+    // Create new task attempt
+    QueryUnitAttempt attempt = newAttempt();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Created attempt " + attempt.getId());
+    }
+    switch (attempts.size()) {
+      case 0:
+        attempts = Collections.singletonMap(attempt.getId(), attempt);
+        break;
+
+      case 1:
+        Map<QueryUnitAttemptId, QueryUnitAttempt> newAttempts
+            = new LinkedHashMap<QueryUnitAttemptId, QueryUnitAttempt>(3);
+        newAttempts.putAll(attempts);
+        attempts = newAttempts;
+        attempts.put(attempt.getId(), attempt);
+        break;
+
+      default:
+        attempts.put(attempt.getId(), attempt);
+        break;
+    }
+
+    if (failedAttempts > 0) {
+      eventHandler.handle(new TaskAttemptEvent(attempt.getId(),
+          TaskAttemptEventType.TA_RESCHEDULE));
+    } else {
+      eventHandler.handle(new TaskAttemptEvent(attempt.getId(),
+          TaskAttemptEventType.TA_SCHEDULE));
+    }
+  }
+
+  private static class AttemptSucceededTransition
+      implements SingleArcTransition<QueryUnit, TaskEvent>{
+
+    @Override
+    public void transition(QueryUnit task,
+                           TaskEvent event) {
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+      QueryUnitAttempt attempt = task.attempts.get(
+          attemptEvent.getTaskAttemptId());
+      task.successfulAttempt = attemptEvent.getTaskAttemptId();
+      task.succeededHost = attempt.getHost();
+      task.succeededPullServerPort = attempt.getPullServerPort();
+      task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(),
+          SubQueryEventType.SQ_TASK_COMPLETED));
+    }
+  }
+
+  private static class AttemptFailedTransition implements
+    MultipleArcTransition<QueryUnit, TaskEvent, TaskState> {
+
+    @Override
+    public TaskState transition(QueryUnit task, TaskEvent taskEvent) {
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent;
+      LOG.info("=============================================================");
+      LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
+      LOG.info("=============================================================");
+      task.failedAttempts++;
+      task.finishedAttempts++;
+
+      if (task.failedAttempts < task.maxAttempts) {
+        if (task.successfulAttempt == null) {
+          task.addAndScheduleAttempt();
+        }
+      } else {
+        task.eventHandler.handle(
+            new SubQueryTaskEvent(task.getId(), SubQueryEventType.SQ_FAILED));
+        return TaskState.FAILED;
+      }
+
+      return task.getState();
+    }
+  }
+
+  @Override
+  public void handle(TaskEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getTaskId() + " of type "
+          + event.getType());
+    }
+
+    try {
+      writeLock.lock();
+      TaskState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        eventHandler.handle(new QueryEvent(getId().getQueryId(),
+            QueryEventType.INTERNAL_ERROR));
+      }
+
+      //notify the eventhandler of state change
+      if (LOG.isDebugEnabled()) {
+        if (oldState != getState()) {
+          LOG.debug(taskId + " Task Transitioned from " + oldState + " to "
+              + getState());
+        }
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void setIntermediateData(Collection<IntermediateEntry> partitions) {
+    this.intermediateData = new ArrayList<IntermediateEntry>(partitions);
+  }
+
+  public List<IntermediateEntry> getIntermediateData() {
+    return this.intermediateData;
+  }
+
+  public static class IntermediateEntry {
+    int taskId;
+    int attemptId;
+    int partitionId;
+    String pullHost;
+    int port;
+
+    public IntermediateEntry(int taskId, int attemptId, int partitionId,
+                             String pullServerAddr, int pullServerPort) {
+      this.taskId = taskId;
+      this.attemptId = attemptId;
+      this.partitionId = partitionId;
+      this.pullHost = pullServerAddr;
+      this.port = pullServerPort;
+    }
+
+    public int getTaskId() {
+      return this.taskId;
+    }
+
+    public int getAttemptId() {
+      return this.attemptId;
+    }
+
+    public int getPartitionId() {
+      return this.partitionId;
+    }
+
+    public String getPullHost() {
+      return this.pullHost;
+    }
+
+    public int getPullPort() {
+      return port;
+    }
+
+    public String getPullAddress() {
+      return pullHost + ":" + port;
+    }
+  }
+}


Mime
View raw message