tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [10/51] [partial] TAJO-22: The package prefix should be org.apache.tajo. (DaeMyung Kang via hyunsik)
Date Tue, 02 Jul 2013 14:16:04 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlannerUtils.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlannerUtils.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlannerUtils.java
deleted file mode 100644
index 089da1b..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlannerUtils.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import tajo.catalog.*;
-import tajo.catalog.proto.CatalogProtos.StoreType;
-import tajo.engine.eval.EvalNode;
-import tajo.engine.parser.QueryBlock;
-import tajo.engine.planner.logical.GroupbyNode;
-import tajo.engine.planner.logical.ScanNode;
-import tajo.engine.planner.logical.StoreTableNode;
-
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Map;
-
-public class GlobalPlannerUtils {
-  private static Log LOG = LogFactory.getLog(GlobalPlannerUtils.class);
-
-  static class WorkerComparatorByNumOfQueryUnits implements Comparator {
-    Map base;
-    public WorkerComparatorByNumOfQueryUnits(Map base) {
-      this.base = base;
-    }
-    public int compare(Object a, Object b) {
-      Collection<QueryUnit> l1 = (Collection<QueryUnit>)base.get(a);
-      Collection<QueryUnit> l2 = (Collection<QueryUnit>)base.get(b);
-      if (l1.size() > l2.size()) {
-        return 1;
-      } else {
-        return -1;
-      }
-    }
-  }
-
- /* public static QueryUnit[] buildQueryDistributionPlan(
-      Map<Fragment, FragmentServingInfo> servingMap,
-      Map<String, List<String>> DNSNameToHostsMap,
-//      Set<String> failedHost,
-      QueryUnit[] queryUnits
-  ) throws UnknownWorkerException {
-    Map<String, Collection<QueryUnit>> map = Maps.newHashMap();
-    ListMultimap<String, QueryUnit> distStatus =
-        Multimaps.newListMultimap(map,
-            new Supplier<List<QueryUnit>>() {
-              @Override
-              public List<QueryUnit> get() {
-                return Lists.newArrayList();
-              }
-            });
-
-    String host;
-    Fragment fragment;
-    FragmentServingInfo servingInfo = null;
-    // build the initial query distribution status
-    for (QueryUnit unit : queryUnits) {
-      Preconditions.checkState(unit.getScanNodes().length == 1);
-      fragment = unit.getFragment(unit.getScanNodes()[0].getTableId());
-      if (servingMap.containsKey(fragment)) {
-        servingInfo = servingMap.get(fragment);
-      } else {
-        servingInfo = null;
-        // error
-      }
-      host = servingInfo.getPrimaryHost();
-      distStatus.put(host, unit);
-    }
-
-    *//*LOG.info("===== before re-balancing =====");
-    for (Map.Entry<String, Collection<QueryUnit>> e : map.entrySet()) {
-      LOG.info(e.getKey() + " : " + e.getValue().size());
-    }
-    LOG.info("\n");*//*
-
-    // re-balancing the query distribution
-    Preconditions.checkState(queryUnits.length >= servingMap.size());
-    int threshold = 0;
-    int mean = queryUnits.length / map.size();
-    int maxQueryUnitNum = mean + threshold;
-    WorkerComparatorByNumOfQueryUnits comp =
-        new WorkerComparatorByNumOfQueryUnits(map);
-    TreeMap<String, Collection<QueryUnit>> sortedMap =
-        Maps.newTreeMap(comp);
-    sortedMap.putAll(map);
-
-    Collection<QueryUnit> fromUnits;
-    Collection<QueryUnit> toUnits;
-    QueryUnit moved;
-    int moveNum = 0;
-    List<Map.Entry<String, Collection<QueryUnit>>> list =
-        Lists.newArrayList(sortedMap.entrySet());
-    int fromIdx = list.size()-1, toIdx = 0;
-    while (fromIdx > toIdx) {
-      toUnits = list.get(toIdx).getValue();
-      fromUnits = list.get(fromIdx).getValue();
-
-      do{
-        moved = fromUnits.iterator().next();
-        toUnits.add(moved);
-        fromUnits.remove(moved);
-        moveNum++;
-      } while (toUnits.size() < maxQueryUnitNum &&
-          fromUnits.size() > maxQueryUnitNum);
-      if (fromUnits.size() <= maxQueryUnitNum) {
-        fromIdx--;
-      }
-      if (toUnits.size() >= maxQueryUnitNum) {
-        toIdx++;
-      }
-    }
-
-    *//*LOG.info("===== after re-balancing " + maxQueryUnitNum + " =====");
-    for (Map.Entry<String, Collection<QueryUnit>> e : list) {
-      LOG.info(e.getKey() + " : " + e.getValue().size());
-    }
-    LOG.info("\n");*//*
-
-    LOG.info(moveNum + " query units among " +
-        queryUnits.length + " are moved!");
-
-    List<String> hosts;
-    int rrIdx = 0;
-    for (Map.Entry<String, Collection<QueryUnit>> e : list) {
-      hosts = DNSNameToHostsMap.get(e.getKey());
-      if (hosts == null) {
-        throw new UnknownWorkerException(e.getKey() + "");
-      }
-      for (QueryUnit unit : e.getValue()) {
-*//*
-        while (failedHost.contains(hosts.get(rrIdx))) {
-          if (++rrIdx == hosts.size()) {
-            rrIdx = 0;
-          }
-        }
-*//*
-        unit.setHost(hosts.get(rrIdx++));
-        if (rrIdx == hosts.size()) {
-          rrIdx = 0;
-        }
-      }
-    }
-
-    return queryUnits;
-  }*/
-
-  public static StoreTableNode newStorePlan(Schema outputSchema,
-                                            String outputTableId) {
-    StoreTableNode store = new StoreTableNode(outputTableId);
-    store.setInSchema(outputSchema);
-    store.setOutSchema(outputSchema);
-    return store;
-  }
-
-  public static ScanNode newScanPlan(Schema inputSchema,
-                                     String inputTableId,
-                                     Path inputPath) {
-    TableMeta meta = CatalogUtil.newTableMeta(inputSchema, StoreType.CSV);
-    TableDesc desc = CatalogUtil.newTableDesc(inputTableId, meta, inputPath);
-    ScanNode newScan = new ScanNode(new QueryBlock.FromTable(desc));
-    newScan.setInSchema(desc.getMeta().getSchema());
-    newScan.setOutSchema(desc.getMeta().getSchema());
-    return newScan;
-  }
-
-  public static GroupbyNode newGroupbyPlan(Schema inputSchema,
-                                           Schema outputSchema,
-                                           Column[] keys,
-                                           EvalNode havingCondition,
-                                           QueryBlock.Target[] targets) {
-    GroupbyNode groupby = new GroupbyNode(keys);
-    groupby.setInSchema(inputSchema);
-    groupby.setOutSchema(outputSchema);
-    groupby.setHavingCondition(havingCondition);
-    groupby.setTargetList(targets);
-
-    return groupby;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
deleted file mode 100644
index 98fb754..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Query.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import tajo.QueryConf;
-import tajo.QueryId;
-import tajo.SubQueryId;
-import tajo.TajoProtos.QueryState;
-import tajo.catalog.*;
-import tajo.catalog.proto.CatalogProtos.StoreType;
-import tajo.engine.json.GsonCreator;
-import tajo.engine.planner.global.MasterPlan;
-import tajo.engine.planner.logical.ExprType;
-import tajo.engine.planner.logical.IndexWriteNode;
-import tajo.master.QueryMaster.QueryContext;
-import tajo.master.event.*;
-import tajo.storage.StorageManager;
-import tajo.util.IndexUtil;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class Query implements EventHandler<QueryEvent> {
-  private static final Log LOG = LogFactory.getLog(Query.class);
-
-
-  // Facilities for Query
-  private final QueryConf conf;
-  private final Clock clock;
-  private String queryStr;
-  private Map<SubQueryId, SubQuery> subqueries;
-  private final EventHandler eventHandler;
-  private final MasterPlan plan;
-  private final StorageManager sm;
-  private QueryContext context;
-  private ExecutionBlockCursor cursor;
-
-  // Query Status
-  private final QueryId id;
-  private long appSubmitTime;
-  private long startTime;
-  private long initializationTime;
-  private long finishTime;
-  private TableDesc resultDesc;
-  private int completedSubQueryCount = 0;
-  private final List<String> diagnostics = new ArrayList<String>();
-
-  // Internal Variables
-  private final Lock readLock;
-  private final Lock writeLock;
-  private int priority = 100;
-
-  // State Machine
-  private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine;
-
-  private static final StateMachineFactory
-      <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
-      new StateMachineFactory<Query, QueryState, QueryEventType, QueryEvent>
-          (QueryState.QUERY_NEW)
-
-      .addTransition(QueryState.QUERY_NEW,
-          EnumSet.of(QueryState.QUERY_INIT, QueryState.QUERY_FAILED),
-          QueryEventType.INIT, new InitTransition())
-
-      .addTransition(QueryState.QUERY_INIT, QueryState.QUERY_RUNNING,
-          QueryEventType.START, new StartTransition())
-
-      .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
-          QueryEventType.INIT_COMPLETED, new InitCompleteTransition())
-      .addTransition(QueryState.QUERY_RUNNING,
-          EnumSet.of(QueryState.QUERY_RUNNING, QueryState.QUERY_SUCCEEDED,
-              QueryState.QUERY_FAILED),
-          QueryEventType.SUBQUERY_COMPLETED,
-          new SubQueryCompletedTransition())
-      .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
-          QueryEventType.INTERNAL_ERROR, new InternalErrorTransition())
-       .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
-          QueryEventType.INTERNAL_ERROR)
-
-      .installTopology();
-
-  public Query(final QueryContext context, final QueryId id, Clock clock,
-               final long appSubmitTime,
-               final String queryStr,
-               final EventHandler eventHandler,
-               final MasterPlan plan, final StorageManager sm) {
-    this.context = context;
-    this.conf = context.getConf();
-    this.id = id;
-    this.clock = clock;
-    this.appSubmitTime = appSubmitTime;
-    this.queryStr = queryStr;
-    subqueries = Maps.newHashMap();
-    this.eventHandler = eventHandler;
-    this.plan = plan;
-    this.sm = sm;
-    cursor = new ExecutionBlockCursor(plan);
-
-    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-    this.readLock = readWriteLock.readLock();
-    this.writeLock = readWriteLock.writeLock();
-
-    stateMachine = stateMachineFactory.make(this);
-  }
-
-  public boolean isCreateTableStmt() {
-    return context.isCreateTableQuery();
-  }
-
-  protected FileSystem getFileSystem(Configuration conf) throws IOException {
-    return FileSystem.get(conf);
-  }
-
-  public float getProgress() {
-    QueryState state = getStateMachine().getCurrentState();
-    if (state == QueryState.QUERY_SUCCEEDED) {
-      return 1.0f;
-    } else {
-      int idx = 0;
-      float [] subProgresses = new float[subqueries.size()];
-      boolean finished = true;
-      for (SubQuery subquery: subqueries.values()) {
-        if (subquery.getState() != SubQueryState.NEW) {
-          subProgresses[idx] = subquery.getProgress();
-          if (finished && subquery.getState() != SubQueryState.SUCCEEDED) {
-            finished = false;
-          }
-        } else {
-          subProgresses[idx] = 0.0f;
-        }
-        idx++;
-      }
-
-      if (finished) {
-        return 1.0f;
-      }
-
-      float totalProgress = 0;
-      float proportion = 1.0f / (float)subqueries.size();
-
-      for (int i = 0; i < subProgresses.length; i++) {
-        totalProgress += subProgresses[i] * proportion;
-      }
-
-      return totalProgress;
-    }
-  }
-
-  public long getAppSubmitTime() {
-    return this.appSubmitTime;
-  }
-
-  public long getStartTime() {
-    return startTime;
-  }
-
-  public void setStartTime() {
-    startTime = clock.getTime();
-  }
-
-  public long getInitializationTime() {
-    return initializationTime;
-  }
-
-  public void setInitializationTime() {
-    initializationTime = clock.getTime();
-  }
-
-
-  public long getFinishTime() {
-    return finishTime;
-  }
-
-  public void setFinishTime() {
-    finishTime = clock.getTime();
-  }
-
-  public List<String> getDiagnostics() {
-    readLock.lock();
-    try {
-      return diagnostics;
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  protected void addDiagnostic(String diag) {
-    diagnostics.add(diag);
-  }
-
-  public TableDesc getResultDesc() {
-    return resultDesc;
-  }
-
-  public void setResultDesc(TableDesc desc) {
-    resultDesc = desc;
-  }
-
-  public MasterPlan getPlan() {
-    return plan;
-  }
-
-  public StateMachine<QueryState, QueryEventType, QueryEvent> getStateMachine() {
-    return stateMachine;
-  }
-  
-  public void addSubQuery(SubQuery subquery) {
-    subqueries.put(subquery.getId(), subquery);
-  }
-  
-  public QueryId getId() {
-    return this.id;
-  }
-  
-  public SubQuery getSubQuery(SubQueryId id) {
-    return this.subqueries.get(id);
-  }
-
-  public QueryState getState() {
-    readLock.lock();
-    try {
-      return stateMachine.getCurrentState();
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  public ExecutionBlockCursor getExecutionBlockCursor() {
-    return cursor;
-  }
-
-  static class InitTransition
-      implements MultipleArcTransition<Query, QueryEvent, QueryState> {
-
-    @Override
-    public QueryState transition(Query query, QueryEvent queryEvent) {
-      query.setStartTime();
-      return QueryState.QUERY_INIT;
-    }
-  }
-
-  public static class StartTransition
-      implements SingleArcTransition<Query, QueryEvent> {
-
-    @Override
-    public void transition(Query query, QueryEvent queryEvent) {
-      SubQuery subQuery = new SubQuery(query.context, query.getExecutionBlockCursor().nextBlock(),
-          query.sm);
-      subQuery.setPriority(query.priority--);
-      query.addSubQuery(subQuery);
-      LOG.info("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
-      subQuery.handle(new SubQueryEvent(subQuery.getId(),
-          SubQueryEventType.SQ_INIT));
-    }
-  }
-
-  public static class SubQueryCompletedTransition implements
-      MultipleArcTransition<Query, QueryEvent, QueryState> {
-
-    @Override
-    public QueryState transition(Query query, QueryEvent event) {
-      // increase the count for completed subqueries
-      query.completedSubQueryCount++;
-      SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
-      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
-
-      // if the subquery is succeeded
-      if (castEvent.getFinalState() == SubQueryState.SUCCEEDED) {
-        if (cursor.hasNext()) {
-          SubQuery nextSubQuery = new SubQuery(query.context, cursor.nextBlock(), query.sm);
-          nextSubQuery.setPriority(query.priority--);
-          query.addSubQuery(nextSubQuery);
-          nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(),
-              SubQueryEventType.SQ_INIT));
-          LOG.info("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
-          LOG.info("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
-          return query.checkQueryForCompleted();
-
-        } else { // Finish a query
-          if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
-            SubQuery subQuery = query.getSubQuery(castEvent.getSubQueryId());
-            TableDesc desc = new TableDescImpl(query.conf.getOutputTable(),
-                subQuery.getTableMeta(), query.context.getOutputPath());
-            query.setResultDesc(desc);
-            try {
-              query.writeStat(query.context.getOutputPath(), subQuery);
-            } catch (IOException e) {
-              e.printStackTrace();
-            }
-            query.eventHandler.handle(new QueryFinishEvent(query.getId()));
-
-            if (query.context.isCreateTableQuery()) {
-              query.context.getCatalog().addTable(desc);
-            }
-          }
-
-          return query.finished(QueryState.QUERY_SUCCEEDED);
-        }
-      } else {
-        // if at least one subquery is failed, the query is also failed.
-        return QueryState.QUERY_FAILED;
-      }
-    }
-  }
-
-  private static class DiagnosticsUpdateTransition implements
-      SingleArcTransition<Query, QueryEvent> {
-    @Override
-    public void transition(Query query, QueryEvent event) {
-      query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event)
-          .getDiagnosticUpdate());
-    }
-  }
-
-  private static class InitCompleteTransition implements
-      SingleArcTransition<Query, QueryEvent> {
-    @Override
-    public void transition(Query query, QueryEvent event) {
-      if (query.initializationTime == 0) {
-        query.setInitializationTime();
-      }
-    }
-  }
-
-  private static class InternalErrorTransition
-      implements SingleArcTransition<Query, QueryEvent> {
-
-    @Override
-    public void transition(Query query, QueryEvent event) {
-      query.finished(QueryState.QUERY_ERROR);
-    }
-  }
-
-  public QueryState finished(QueryState finalState) {
-    setFinishTime();
-    return finalState;
-  }
-
-  /**
-   * Check if all subqueries of the query are completed
-   * @return QueryState.QUERY_SUCCEEDED if all subqueries are completed.
-   */
-  QueryState checkQueryForCompleted() {
-    if (completedSubQueryCount == subqueries.size()) {
-      return QueryState.QUERY_SUCCEEDED;
-    }
-    return getState();
-  }
-
-
-  @Override
-  public void handle(QueryEvent event) {
-    LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
-    try {
-      writeLock.lock();
-      QueryState oldState = getState();
-      try {
-        getStateMachine().doTransition(event.getType(), event);
-      } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state", e);
-        eventHandler.handle(new QueryEvent(this.id,
-            QueryEventType.INTERNAL_ERROR));
-      }
-
-      //notify the eventhandler of state change
-      if (oldState != getState()) {
-        LOG.info(id + " Query Transitioned from " + oldState + " to "
-            + getState());
-      }
-    }
-
-    finally {
-      writeLock.unlock();
-    }
-  }
-
-  private void writeStat(Path outputPath, SubQuery subQuery)
-      throws IOException {
-    ExecutionBlock execBlock = subQuery.getBlock();
-    if (execBlock.getPlan().getType() == ExprType.CREATE_INDEX) {
-      IndexWriteNode index = (IndexWriteNode) execBlock.getPlan();
-      Path indexPath = new Path(sm.getTablePath(index.getTableName()), "index");
-      TableMeta meta;
-      if (sm.getFileSystem().exists(new Path(indexPath, ".meta"))) {
-        meta = sm.getTableMeta(indexPath);
-      } else {
-        meta = CatalogUtil
-            .newTableMeta(execBlock.getOutputSchema(), StoreType.CSV);
-      }
-      String indexName = IndexUtil.getIndexName(index.getTableName(),
-          index.getSortSpecs());
-      String json = GsonCreator.getInstance().toJson(index.getSortSpecs());
-      meta.putOption(indexName, json);
-
-      sm.writeTableMeta(indexPath, meta);
-
-    } else {
-      sm.writeTableMeta(outputPath, subQuery.getTableMeta());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
deleted file mode 100644
index 7317692..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
+++ /dev/null
@@ -1,465 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.service.CompositeService;
-import org.apache.hadoop.yarn.service.Service;
-import tajo.*;
-import tajo.catalog.CatalogService;
-import tajo.conf.TajoConf;
-import tajo.engine.planner.global.MasterPlan;
-import tajo.master.TajoMaster.MasterContext;
-import tajo.master.TaskRunnerLauncherImpl.ContainerProxy;
-import tajo.master.event.*;
-import tajo.master.rm.RMContainerAllocator;
-import tajo.storage.StorageManager;
-import tajo.storage.StorageUtil;
-import tajo.util.TajoIdUtils;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class QueryMaster extends CompositeService implements EventHandler {
-  private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
-
-  // Master Context
-  private final MasterContext masterContext;
-
-  // AppMaster Common
-  private final Clock clock;
-  private final long appSubmitTime;
-  private String appName;
-  private final ApplicationAttemptId appAttemptID;
-
-  // For Query
-  private final QueryId queryId;
-  private QueryContext queryContext;
-  private Query query;
-  private MasterPlan masterPlan;
-
-  private AsyncDispatcher dispatcher;
-  private YarnRPC rpc;
-  private RMContainerAllocator rmAllocator;
-  private TaskRunnerListener taskRunnerListener;
-  private TaskRunnerLauncher taskRunnerLauncher;
-
-  // Services of Tajo
-  private CatalogService catalog;
-
-  private boolean isCreateTableStmt;
-  private StorageManager storageManager;
-  private FileSystem defaultFS;
-  private Path outputPath;
-
-  public QueryMaster(final MasterContext masterContext,
-                     final ApplicationAttemptId appAttemptID,
-                     final Clock clock, long appSubmitTime,
-                     MasterPlan masterPlan) {
-    super(QueryMaster.class.getName());
-    this.masterContext = masterContext;
-
-    this.appAttemptID = appAttemptID;
-    this.clock = clock;
-    this.appSubmitTime = appSubmitTime;
-
-    this.queryId = TajoIdUtils.createQueryId(appAttemptID);
-    this.masterPlan = masterPlan;
-    LOG.info("Created Query Master for " + appAttemptID);
-  }
-
-  public void init(Configuration _conf) {
-    QueryConf conf = new QueryConf(_conf);
-
-    try {
-      queryContext = new QueryContext(conf);
-
-      dispatcher = masterContext.getDispatcher();
-      // TODO - This comment should be eliminated when QueryMaster is separated.
-      dispatcher = new AsyncDispatcher();
-      addIfService(dispatcher);
-
-      // TODO - This comment should be improved when QueryMaster is separated.
-      rpc = masterContext.getYarnRPC();
-
-      catalog = masterContext.getCatalog();
-      storageManager = masterContext.getStorageManager();
-
-      taskRunnerListener = new TaskRunnerListener(queryContext);
-      addIfService(taskRunnerListener);
-
-      rmAllocator = new RMContainerAllocator(queryContext);
-      addIfService(rmAllocator);
-      dispatcher.register(ContainerAllocatorEventType.class, rmAllocator);
-
-      query = new Query(queryContext, queryId, clock, appSubmitTime,
-          "", dispatcher.getEventHandler(), masterPlan, storageManager);
-      initStagingDir();
-
-      // QueryEventDispatcher is already registered in TajoMaster
-      dispatcher.register(QueryEventType.class, query);
-      dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher());
-      dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
-      dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
-      dispatcher.register(QueryFinishEvent.EventType.class, new QueryFinishEventHandler());
-      dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
-
-      taskRunnerLauncher = new TaskRunnerLauncherImpl(queryContext);
-      addIfService(taskRunnerLauncher);
-      dispatcher.register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
-
-
-    } catch (Throwable t) {
-      LOG.error(ExceptionUtils.getStackTrace(t));
-      throw new RuntimeException(t);
-    }
-
-    super.init(conf);
-  }
-
-  public void start() {
-    super.start();
-    startQuery();
-  }
-
-  public void stop() {
-    super.stop();
-  }
-
-  protected void addIfService(Object object) {
-    if (object instanceof Service) {
-      addService((Service) object);
-    }
-  }
-
-  public void startQuery() {
-    dispatcher.getEventHandler().handle(new QueryEvent(queryId,
-        QueryEventType.INIT));
-    dispatcher.getEventHandler().handle(new QueryEvent(queryId,
-        QueryEventType.START));
-  }
-
-  @Override
-  public void handle(Event event) {
-    dispatcher.getEventHandler().handle(event);
-  }
-
-  public EventHandler getEventHandler() {
-    return dispatcher.getEventHandler();
-  }
-
-  private class SubQueryEventDispatcher implements EventHandler<SubQueryEvent> {
-    public void handle(SubQueryEvent event) {
-      SubQueryId id = event.getSubQueryId();
-      query.getSubQuery(id).handle(event);
-    }
-  }
-
-  private class TaskEventDispatcher
-      implements EventHandler<TaskEvent> {
-    public void handle(TaskEvent event) {
-      QueryUnitId taskId = event.getTaskId();
-      QueryUnit task = query.getSubQuery(taskId.getSubQueryId()).
-          getQueryUnit(taskId);
-      task.handle(event);
-    }
-  }
-
-  private class TaskAttemptEventDispatcher
-      implements EventHandler<TaskAttemptEvent> {
-    public void handle(TaskAttemptEvent event) {
-      QueryUnitAttemptId attemptId = event.getTaskAttemptId();
-      SubQuery subQuery = query.getSubQuery(attemptId.getSubQueryId());
-      QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
-      QueryUnitAttempt attempt = task.getAttempt(attemptId);
-      attempt.handle(event);
-    }
-  }
-
-  private class TaskSchedulerDispatcher
-      implements EventHandler<TaskSchedulerEvent> {
-    public void handle(TaskSchedulerEvent event) {
-      SubQuery subQuery = query.getSubQuery(event.getSubQueryId());
-      subQuery.getTaskScheduler().handle(event);
-    }
-  }
-
-  public QueryContext getContext() {
-    return this.queryContext;
-  }
-
-  public class QueryContext {
-    private QueryConf conf;
-    public Map<ContainerId, ContainerProxy> containers = new ConcurrentHashMap<ContainerId, ContainerProxy>();
-    int minCapability;
-    int maxCapability;
-    int numCluster;
-
-    public QueryContext(QueryConf conf) {
-      this.conf = conf;
-    }
-
-    public QueryConf getConf() {
-      return conf;
-    }
-
-    public AsyncDispatcher getDispatcher() {
-      return dispatcher;
-    }
-
-    public Clock getClock() {
-      return clock;
-    }
-
-    public Query getQuery() {
-      return query;
-    }
-
-    public SubQuery getSubQuery(SubQueryId subQueryId) {
-      return query.getSubQuery(subQueryId);
-    }
-
-    public QueryId getQueryId() {
-      return queryId;
-    }
-
-    public ApplicationId getApplicationId() {
-      return appAttemptID.getApplicationId();
-    }
-
-    public ApplicationAttemptId getApplicationAttemptId() {
-      return appAttemptID;
-    }
-
-    public EventHandler getEventHandler() {
-      return dispatcher.getEventHandler();
-    }
-
-    public YarnRPC getYarnRPC() {
-      return rpc;
-    }
-
-    public InetSocketAddress getRpcAddress() {
-      return masterContext.getClientService().getBindAddress();
-    }
-
-    public InetSocketAddress getTaskListener() {
-      return taskRunnerListener.getBindAddress();
-    }
-
-    public void addContainer(ContainerId cId, ContainerProxy container) {
-      containers.put(cId, container);
-    }
-
-    public void removeContainer(ContainerId cId) {
-      containers.remove(cId);
-    }
-
-    public boolean containsContainer(ContainerId cId) {
-      return containers.containsKey(cId);
-    }
-
-    public ContainerProxy getContainer(ContainerId cId) {
-      return containers.get(cId);
-    }
-
-    public int getNumClusterNode() {
-      return numCluster;
-    }
-
-    public void setNumClusterNodes(int num) {
-      numCluster = num;
-    }
-
-    public CatalogService getCatalog() {
-      return catalog;
-    }
-
-    public Path getOutputPath() {
-      return outputPath;
-    }
-
-    public void setMaxContainerCapability(int capability) {
-      this.maxCapability = capability;
-    }
-
-    public int getMaxContainerCapability() {
-      return this.maxCapability;
-    }
-
-    public void setMinContainerCapability(int capability) {
-      this.minCapability = capability;
-    }
-
-    public int getMinContainerCapability() {
-      return this.minCapability;
-    }
-
-    public boolean isCreateTableQuery() {
-      return isCreateTableStmt;
-    }
-
-    public float getProgress() {
-      return query.getProgress();
-    }
-
-    public long getStartTime() {
-      return query.getStartTime();
-    }
-
-    public long getFinishTime() {
-      return query.getFinishTime();
-    }
-
-    public StorageManager getStorageManager() {
-      return storageManager;
-    }
-  }
-
-  private class QueryFinishEventHandler implements EventHandler<QueryFinishEvent> {
-    @Override
-    public void handle(QueryFinishEvent event) {
-      LOG.info("Query end notification started for QueryId : " + query.getId());
-
-      try {
-        // Stop all services
-        // This will also send the final report to the ResourceManager
-        LOG.info("Calling stop for all the services");
-        stop();
-
-      } catch (Throwable t) {
-        LOG.warn("Graceful stop failed ", t);
-      }
-
-      //Bring the process down by force.
-      //Not needed after HADOOP-7140
-      LOG.info("Exiting QueryMaster..GoodBye!");
-      // TODO - to be enabled if query master is separated.
-      //System.exit(0);
-    }
-  }
-
-  // query submission directory is private!
-  final public static FsPermission USER_DIR_PERMISSION =
-      FsPermission.createImmutable((short) 0700); // rwx--------
-
-  /**
-   * It initializes the final output and staging directory and sets
-   * them to variables.
-   */
-  private void initStagingDir() throws IOException {
-    QueryConf conf = getContext().getConf();
-
-    String realUser;
-    String currentUser;
-    UserGroupInformation ugi;
-    ugi = UserGroupInformation.getLoginUser();
-    realUser = ugi.getShortUserName();
-    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
-
-    String givenOutputTableName = conf.getOutputTable();
-    Path stagingDir;
-
-    // If final output directory is not given by an user,
-    // we use the query id as a output directory.
-    if (givenOutputTableName.equals("")) {
-      this.isCreateTableStmt = false;
-      FileSystem defaultFS = FileSystem.get(conf);
-
-      Path homeDirectory = defaultFS.getHomeDirectory();
-      if (!defaultFS.exists(homeDirectory)) {
-        defaultFS.mkdirs(homeDirectory, new FsPermission(USER_DIR_PERMISSION));
-      }
-
-      Path userQueryDir = new Path(homeDirectory, TajoConstants.USER_QUERYDIR_PREFIX);
-
-      if (defaultFS.exists(userQueryDir)) {
-        FileStatus fsStatus = defaultFS.getFileStatus(userQueryDir);
-        String owner = fsStatus.getOwner();
-
-        if (!(owner.equals(currentUser) || owner.equals(realUser))) {
-          throw new IOException("The ownership on the user's query " +
-              "directory " + userQueryDir + " is not as expected. " +
-              "It is owned by " + owner + ". The directory must " +
-              "be owned by the submitter " + currentUser + " or " +
-              "by " + realUser);
-        }
-
-        if (!fsStatus.getPermission().equals(USER_DIR_PERMISSION)) {
-          LOG.info("Permissions on staging directory " + userQueryDir + " are " +
-              "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
-              "to correct value " + USER_DIR_PERMISSION);
-          defaultFS.setPermission(userQueryDir, new FsPermission(USER_DIR_PERMISSION));
-        }
-      } else {
-        defaultFS.mkdirs(userQueryDir,
-            new FsPermission(USER_DIR_PERMISSION));
-      }
-
-      stagingDir = StorageUtil.concatPath(userQueryDir, queryId.toString());
-
-      if (defaultFS.exists(stagingDir)) {
-        throw new IOException("The staging directory " + stagingDir
-            + "already exists. The directory must be unique to each query");
-      } else {
-        defaultFS.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
-      }
-
-      // Set the query id to the output table name
-      conf.setOutputTable(queryId.toString());
-
-    } else {
-      this.isCreateTableStmt = true;
-      Path warehouseDir = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR),
-          TajoConstants.WAREHOUSE_DIR);
-      stagingDir = new Path(warehouseDir, conf.getOutputTable());
-
-      FileSystem fs = warehouseDir.getFileSystem(conf);
-      if (fs.exists(stagingDir)) {
-        throw new IOException("The staging directory " + stagingDir
-            + " already exists. The directory must be unique to each query");
-      } else {
-        // TODO - should have appropriate permission
-        fs.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
-      }
-    }
-
-    conf.setOutputPath(stagingDir);
-    outputPath = stagingDir;
-    LOG.info("Initialized Query Staging Dir: " + outputPath);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryUnit.java
deleted file mode 100644
index 38771e7..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryUnit.java
+++ /dev/null
@@ -1,502 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import tajo.QueryIdFactory;
-import tajo.QueryUnitAttemptId;
-import tajo.QueryUnitId;
-import tajo.catalog.Schema;
-import tajo.catalog.statistics.TableStat;
-import tajo.engine.MasterWorkerProtos.Partition;
-import tajo.engine.planner.logical.*;
-import tajo.master.event.*;
-import tajo.storage.Fragment;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class QueryUnit implements EventHandler<TaskEvent> {
-  /** Class Logger */
-  private static final Log LOG = LogFactory.getLog(QueryUnit.class);
-
-	private QueryUnitId taskId;
-  private EventHandler eventHandler;
-	private StoreTableNode store = null;
-	private LogicalNode plan = null;
-	private List<ScanNode> scan;
-	
-	private Map<String, Fragment> fragMap;
-	private Map<String, Set<URI>> fetchMap;
-	
-  private List<Partition> partitions;
-	private TableStat stats;
-  private String [] dataLocations;
-  private final boolean isLeafTask;
-  private List<IntermediateEntry> intermediateData;
-
-  private Map<QueryUnitAttemptId, QueryUnitAttempt> attempts;
-  private final int maxAttempts = 3;
-  private Integer lastAttemptId;
-
-  private QueryUnitAttemptId successfulAttempt;
-  private String succeededHost;
-  private int succeededPullServerPort;
-
-  private int failedAttempts;
-  private int finishedAttempts; // finish are total of success, failed and killed
-
-  private static final StateMachineFactory
-      <QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
-      new StateMachineFactory
-          <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
-
-      .addTransition(TaskState.NEW, TaskState.SCHEDULED,
-          TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
-
-       .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
-           TaskEventType.T_ATTEMPT_LAUNCHED)
-
-        .addTransition(TaskState.RUNNING, TaskState.RUNNING,
-           TaskEventType.T_ATTEMPT_LAUNCHED)
-
-       .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
-           TaskEventType.T_ATTEMPT_SUCCEEDED, new AttemptSucceededTransition())
-
-       .addTransition(TaskState.RUNNING,
-            EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
-            TaskEventType.T_ATTEMPT_FAILED, new AttemptFailedTransition())
-
-
-
-      .installTopology();
-  private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
-
-
-  private final Lock readLock;
-  private final Lock writeLock;
-
-	public QueryUnit(QueryUnitId id, boolean isLeafTask, EventHandler eventHandler) {
-		this.taskId = id;
-    this.eventHandler = eventHandler;
-    this.isLeafTask = isLeafTask;
-		scan = new ArrayList<ScanNode>();
-    fetchMap = Maps.newHashMap();
-    fragMap = Maps.newHashMap();
-    partitions = new ArrayList<Partition>();
-    attempts = Collections.emptyMap();
-    lastAttemptId = -1;
-    failedAttempts = 0;
-
-    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-    this.readLock = readWriteLock.readLock();
-    this.writeLock = readWriteLock.writeLock();
-
-    stateMachine = stateMachineFactory.make(this);
-	}
-
-  public boolean isLeafTask() {
-    return this.isLeafTask;
-  }
-
-  public void setDataLocations(String [] dataLocations) {
-    this.dataLocations = dataLocations;
-  }
-
-  public String [] getDataLocations() {
-    return this.dataLocations;
-  }
-
-  public TaskState getState() {
-    readLock.lock();
-    try {
-      return stateMachine.getCurrentState();
-    } finally {
-      readLock.unlock();
-    }
-  }
-	
-	public void setLogicalPlan(LogicalNode plan) {
-    Preconditions.checkArgument(plan.getType() == ExprType.STORE ||
-        plan.getType() == ExprType.CREATE_INDEX);
-    
-	  this.plan = plan;
-	  if (plan instanceof StoreTableNode) {
-      store = (StoreTableNode) plan;      
-    } else {
-      store = (StoreTableNode) ((IndexWriteNode)plan).getSubNode();
-    }
-	  LogicalNode node = plan;
-	  ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
-	  s.add(node);
-	  while (!s.isEmpty()) {
-	    node = s.remove(s.size()-1);
-	    if (node instanceof UnaryNode) {
-	      UnaryNode unary = (UnaryNode) node;
-	      s.add(s.size(), unary.getSubNode());
-	    } else if (node instanceof BinaryNode) {
-	      BinaryNode binary = (BinaryNode) node;
-	      s.add(s.size(), binary.getOuterNode());
-	      s.add(s.size(), binary.getInnerNode());
-	    } else if (node instanceof ScanNode) {
-	      scan.add((ScanNode)node);
-	    }
-	  }
-	}
-
-  @Deprecated
-  public void setFragment(String tableId, Fragment fragment) {
-    this.fragMap.put(tableId, fragment);
-    if (fragment.hasDataLocations()) {
-      setDataLocations(fragment.getDataLocations());
-    }
-  }
-
-  public void setFragment2(Fragment fragment) {
-    this.fragMap.put(fragment.getId(), fragment);
-    if (fragment.hasDataLocations()) {
-      setDataLocations(fragment.getDataLocations());
-    }
-  }
-	
-	public void addFetch(String tableId, String uri) throws URISyntaxException {
-	  this.addFetch(tableId, new URI(uri));
-	}
-	
-	public void addFetch(String tableId, URI uri) {
-	  Set<URI> uris;
-	  if (fetchMap.containsKey(tableId)) {
-	    uris = fetchMap.get(tableId);
-	  } else {
-	    uris = Sets.newHashSet();
-	  }
-	  uris.add(uri);
-    fetchMap.put(tableId, uris);
-	}
-	
-	public void addFetches(String tableId, Collection<URI> urilist) {
-	  Set<URI> uris;
-    if (fetchMap.containsKey(tableId)) {
-      uris = fetchMap.get(tableId);
-    } else {
-      uris = Sets.newHashSet();
-    }
-    uris.addAll(urilist);
-    fetchMap.put(tableId, uris);
-	}
-	
-	public void setFetches(Map<String, Set<URI>> fetches) {
-	  this.fetchMap.clear();
-	  this.fetchMap.putAll(fetches);
-	}
-	
-  public Fragment getFragment(String tableId) {
-    return this.fragMap.get(tableId);
-  }
-
-  public Collection<Fragment> getAllFragments() {
-    return fragMap.values();
-  }
-	
-	public LogicalNode getLogicalPlan() {
-	  return this.plan;
-	}
-	
-	public QueryUnitId getId() {
-		return taskId;
-	}
-	
-	public Collection<URI> getFetchHosts(String tableId) {
-	  return fetchMap.get(tableId);
-	}
-	
-	public Collection<Set<URI>> getFetches() {
-	  return fetchMap.values();
-	}
-	
-	public Collection<URI> getFetch(ScanNode scan) {
-	  return this.fetchMap.get(scan.getTableId());
-	}
-
-	public String getOutputName() {
-		return this.store.getTableName();
-	}
-	
-	public Schema getOutputSchema() {
-	  return this.store.getOutSchema();
-	}
-	
-	public StoreTableNode getStoreTableNode() {
-	  return this.store;
-	}
-	
-	public ScanNode[] getScanNodes() {
-	  return this.scan.toArray(new ScanNode[scan.size()]);
-	}
-	
-	@Override
-	public String toString() {
-		String str = new String(plan.getType() + " \n");
-		for (Entry<String, Fragment> e : fragMap.entrySet()) {
-		  str += e.getKey() + " : ";
-      str += e.getValue() + " ";
-		}
-		for (Entry<String, Set<URI>> e : fetchMap.entrySet()) {
-      str += e.getKey() + " : ";
-      for (URI t : e.getValue()) {
-        str += t + " ";
-      }
-    }
-		
-		return str;
-	}
-	
-	public void setStats(TableStat stats) {
-	  this.stats = stats;
-	}
-	
-	public void setPartitions(List<Partition> partitions) {
-	  this.partitions = Collections.unmodifiableList(partitions);
-	}
-	
-	public TableStat getStats() {
-	  return this.stats;
-	}
-	
-	public List<Partition> getPartitions() {
-	  return this.partitions;
-	}
-	
-	public int getPartitionNum() {
-	  return this.partitions.size();
-	}
-
-  public QueryUnitAttempt newAttempt() {
-    QueryUnitAttempt attempt = new QueryUnitAttempt(
-        QueryIdFactory.newQueryUnitAttemptId(this.getId(),
-            ++lastAttemptId), this, eventHandler);
-    return attempt;
-  }
-
-  public QueryUnitAttempt getAttempt(QueryUnitAttemptId attemptId) {
-    return attempts.get(attemptId);
-  }
-
-  public QueryUnitAttempt getAttempt(int attempt) {
-    return this.attempts.get(new QueryUnitAttemptId(this.getId(), attempt));
-  }
-
-  public QueryUnitAttempt getLastAttempt() {
-    return this.attempts.get(this.lastAttemptId);
-  }
-
-  protected QueryUnitAttempt getSuccessfulAttempt() {
-    readLock.lock();
-    try {
-      if (null == successfulAttempt) {
-        return null;
-      }
-      return attempts.get(successfulAttempt);
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  public int getRetryCount () {
-    return this.lastAttemptId;
-  }
-
-  private static class InitialScheduleTransition implements
-    SingleArcTransition<QueryUnit, TaskEvent> {
-
-    @Override
-    public void transition(QueryUnit task, TaskEvent taskEvent) {
-      task.addAndScheduleAttempt();
-    }
-  }
-
-  // This is always called in the Write Lock
-  private void addAndScheduleAttempt() {
-    // Create new task attempt
-    QueryUnitAttempt attempt = newAttempt();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Created attempt " + attempt.getId());
-    }
-    switch (attempts.size()) {
-      case 0:
-        attempts = Collections.singletonMap(attempt.getId(), attempt);
-        break;
-
-      case 1:
-        Map<QueryUnitAttemptId, QueryUnitAttempt> newAttempts
-            = new LinkedHashMap<QueryUnitAttemptId, QueryUnitAttempt>(3);
-        newAttempts.putAll(attempts);
-        attempts = newAttempts;
-        attempts.put(attempt.getId(), attempt);
-        break;
-
-      default:
-        attempts.put(attempt.getId(), attempt);
-        break;
-    }
-
-    if (failedAttempts > 0) {
-      eventHandler.handle(new TaskAttemptEvent(attempt.getId(),
-          TaskAttemptEventType.TA_RESCHEDULE));
-    } else {
-      eventHandler.handle(new TaskAttemptEvent(attempt.getId(),
-          TaskAttemptEventType.TA_SCHEDULE));
-    }
-  }
-
-  private static class AttemptSucceededTransition
-      implements SingleArcTransition<QueryUnit, TaskEvent>{
-
-    @Override
-    public void transition(QueryUnit task,
-                           TaskEvent event) {
-      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
-      QueryUnitAttempt attempt = task.attempts.get(
-          attemptEvent.getTaskAttemptId());
-      task.successfulAttempt = attemptEvent.getTaskAttemptId();
-      task.succeededHost = attempt.getHost();
-      task.succeededPullServerPort = attempt.getPullServerPort();
-      task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(),
-          SubQueryEventType.SQ_TASK_COMPLETED));
-    }
-  }
-
-  private static class AttemptFailedTransition implements
-    MultipleArcTransition<QueryUnit, TaskEvent, TaskState> {
-
-    @Override
-    public TaskState transition(QueryUnit task, TaskEvent taskEvent) {
-      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent;
-      LOG.info("=============================================================");
-      LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
-      LOG.info("=============================================================");
-      task.failedAttempts++;
-      task.finishedAttempts++;
-
-      if (task.failedAttempts < task.maxAttempts) {
-        if (task.successfulAttempt == null) {
-          task.addAndScheduleAttempt();
-        }
-      } else {
-        task.eventHandler.handle(
-            new SubQueryTaskEvent(task.getId(), SubQueryEventType.SQ_FAILED));
-        return TaskState.FAILED;
-      }
-
-      return task.getState();
-    }
-  }
-
-  @Override
-  public void handle(TaskEvent event) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing " + event.getTaskId() + " of type "
-          + event.getType());
-    }
-
-    try {
-      writeLock.lock();
-      TaskState oldState = getState();
-      try {
-        stateMachine.doTransition(event.getType(), event);
-      } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state", e);
-        eventHandler.handle(new QueryEvent(getId().getQueryId(),
-            QueryEventType.INTERNAL_ERROR));
-      }
-
-      //notify the eventhandler of state change
-      if (LOG.isDebugEnabled()) {
-        if (oldState != getState()) {
-          LOG.debug(taskId + " Task Transitioned from " + oldState + " to "
-              + getState());
-        }
-      }
-    }
-
-    finally {
-      writeLock.unlock();
-    }
-  }
-
-  public void setIntermediateData(Collection<IntermediateEntry> partitions) {
-    this.intermediateData = new ArrayList<IntermediateEntry>(partitions);
-  }
-
-  public List<IntermediateEntry> getIntermediateData() {
-    return this.intermediateData;
-  }
-
-  public static class IntermediateEntry {
-    int taskId;
-    int attemptId;
-    int partitionId;
-    String pullHost;
-    int port;
-
-    public IntermediateEntry(int taskId, int attemptId, int partitionId,
-                             String pullServerAddr, int pullServerPort) {
-      this.taskId = taskId;
-      this.attemptId = attemptId;
-      this.partitionId = partitionId;
-      this.pullHost = pullServerAddr;
-      this.port = pullServerPort;
-    }
-
-    public int getTaskId() {
-      return this.taskId;
-    }
-
-    public int getAttemptId() {
-      return this.attemptId;
-    }
-
-    public int getPartitionId() {
-      return this.partitionId;
-    }
-
-    public String getPullHost() {
-      return this.pullHost;
-    }
-
-    public int getPullPort() {
-      return port;
-    }
-
-    public String getPullAddress() {
-      return pullHost + ":" + port;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryUnitAttempt.java
deleted file mode 100644
index c4dfae3..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryUnitAttempt.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.hadoop.yarn.util.RackResolver;
-import tajo.QueryUnitAttemptId;
-import tajo.TajoProtos.TaskAttemptState;
-import tajo.catalog.statistics.TableStat;
-import tajo.engine.MasterWorkerProtos.Partition;
-import tajo.engine.MasterWorkerProtos.TaskCompletionReport;
-import tajo.master.QueryUnit.IntermediateEntry;
-import tajo.master.event.*;
-import tajo.master.event.TaskSchedulerEvent.EventType;
-
-import java.util.*;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
-
-  private static final Log LOG = LogFactory.getLog(QueryUnitAttempt.class);
-
-  private final static int EXPIRE_TIME = 15000;
-
-  private final QueryUnitAttemptId id;
-  private final QueryUnit queryUnit;
-  final EventHandler eventHandler;
-
-  private String hostName;
-  private int port;
-  private int expire;
-
-  private final Lock readLock;
-  private final Lock writeLock;
-
-  private final List<String> diagnostics = new ArrayList<String>();
-
-  private static final StateMachineFactory
-      <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
-      stateMachineFactory = new StateMachineFactory
-      <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
-      (TaskAttemptState.TA_NEW)
-
-      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
-          TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
-      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
-          TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
-
-      .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
-          TaskAttemptEventType.TA_ASSIGNED, new LaunchTransition())
-
-      // from assigned
-      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
-          TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
-      .addTransition(TaskAttemptState.TA_ASSIGNED,
-          EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
-          TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
-
-      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
-          TaskAttemptEventType.TA_DONE, new SucceededTransition())
-
-      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
-          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-
-      // from running
-      .addTransition(TaskAttemptState.TA_RUNNING,
-          EnumSet.of(TaskAttemptState.TA_RUNNING),
-          TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
-
-      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
-          TaskAttemptEventType.TA_DONE, new SucceededTransition())
-
-      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
-          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-
-      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
-          TaskAttemptEventType.TA_UPDATE)
-      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
-          TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
-      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
-          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-
-      .installTopology();
-
-  private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
-    stateMachine;
-
-
-  public QueryUnitAttempt(final QueryUnitAttemptId id, final QueryUnit queryUnit,
-                          final EventHandler eventHandler) {
-    this.id = id;
-    this.expire = QueryUnitAttempt.EXPIRE_TIME;
-    this.queryUnit = queryUnit;
-    this.eventHandler = eventHandler;
-
-    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-    this.readLock = readWriteLock.readLock();
-    this.writeLock = readWriteLock.writeLock();
-
-    stateMachine = stateMachineFactory.make(this);
-  }
-
-  public TaskAttemptState getState() {
-    readLock.lock();
-    try {
-      return stateMachine.getCurrentState();
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  public QueryUnitAttemptId getId() {
-    return this.id;
-  }
-
-  public boolean isLeafTask() {
-    return this.queryUnit.isLeafTask();
-  }
-
-  public QueryUnit getQueryUnit() {
-    return this.queryUnit;
-  }
-
-  public String getHost() {
-    return this.hostName;
-  }
-
-  public void setHost(String host) {
-    this.hostName = host;
-  }
-
-  public void setPullServerPort(int port) {
-    this.port = port;
-  }
-
-  public int getPullServerPort() {
-    return port;
-  }
-
-  public synchronized void setExpireTime(int expire) {
-    this.expire = expire;
-  }
-
-  public synchronized void updateExpireTime(int period) {
-    this.setExpireTime(this.expire - period);
-  }
-
-  public synchronized void resetExpireTime() {
-    this.setExpireTime(QueryUnitAttempt.EXPIRE_TIME);
-  }
-
-  public int getLeftTime() {
-    return this.expire;
-  }
-
-  private void fillTaskStatistics(TaskCompletionReport report) {
-    if (report.getPartitionsCount() > 0) {
-      this.getQueryUnit().setPartitions(report.getPartitionsList());
-
-      List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
-      for (Partition p : report.getPartitionsList()) {
-        IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(),
-            getId().getId(), p.getPartitionKey(), getHost(), getPullServerPort());
-        partitions.add(entry);
-      }
-      this.getQueryUnit().setIntermediateData(partitions);
-    }
-    if (report.hasResultStats()) {
-      this.getQueryUnit().setStats(new TableStat(report.getResultStats()));
-    }
-  }
-
-  private static class TaskAttemptScheduleTransition implements
-    SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
-
-    @Override
-    public void transition(QueryUnitAttempt taskAttempt,
-                           TaskAttemptEvent taskAttemptEvent) {
-
-      if (taskAttempt.isLeafTask()
-          && taskAttempt.getQueryUnit().getScanNodes().length == 1) {
-        Set<String> racks = new HashSet<String>();
-        for (String host : taskAttempt.getQueryUnit().getDataLocations()) {
-          racks.add(RackResolver.resolve(host).getNetworkLocation());
-        }
-
-        taskAttempt.eventHandler.handle(new TaskScheduleEvent(
-            taskAttempt.getId(), EventType.T_SCHEDULE, true,
-            taskAttempt.getQueryUnit().getDataLocations(),
-            racks.toArray(new String[racks.size()])
-        ));
-      } else {
-        taskAttempt.eventHandler.handle(new TaskScheduleEvent(
-            taskAttempt.getId(), EventType.T_SCHEDULE,
-            false,
-            null,
-            null
-        ));
-      }
-    }
-  }
-
-  private static class LaunchTransition
-      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
-
-    @Override
-    public void transition(QueryUnitAttempt taskAttempt,
-                           TaskAttemptEvent event) {
-      TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
-      taskAttempt.setHost(castEvent.getHostName());
-      taskAttempt.setPullServerPort(castEvent.getPullServerPort());
-      taskAttempt.eventHandler.handle(
-          new TaskTAttemptEvent(taskAttempt.getId(),
-              TaskEventType.T_ATTEMPT_LAUNCHED));
-    }
-  }
-
-  private static class StatusUpdateTransition
-      implements MultipleArcTransition<QueryUnitAttempt, TaskAttemptEvent, TaskAttemptState> {
-
-    @Override
-    public TaskAttemptState transition(QueryUnitAttempt taskAttempt,
-                                       TaskAttemptEvent event) {
-      TaskAttemptStatusUpdateEvent updateEvent =
-          (TaskAttemptStatusUpdateEvent) event;
-
-      switch (updateEvent.getStatus().getState()) {
-        case TA_PENDING:
-        case TA_RUNNING:
-          return TaskAttemptState.TA_RUNNING;
-
-        default:
-          return taskAttempt.getState();
-      }
-    }
-  }
-
-  private void addDiagnosticInfo(String diag) {
-    if (diag != null && !diag.equals("")) {
-      diagnostics.add(diag);
-    }
-  }
-
-  private static class AlreadyAssignedTransition
-      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
-
-    @Override
-    public void transition(QueryUnitAttempt queryUnitAttempt,
-                           TaskAttemptEvent taskAttemptEvent) {
-      LOG.info(">>>>>>>>> Already Assigned: " + queryUnitAttempt.getId());
-    }
-  }
-
-  private static class AlreadyDoneTransition
-      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
-
-    @Override
-    public void transition(QueryUnitAttempt queryUnitAttempt,
-                           TaskAttemptEvent taskAttemptEvent) {
-      LOG.info(">>>>>>>>> Already Done: " + queryUnitAttempt.getId());
-    }
-  }
-
-  private static class SucceededTransition
-      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
-    @Override
-    public void transition(QueryUnitAttempt taskAttempt,
-                           TaskAttemptEvent event) {
-      TaskCompletionReport report = ((TaskCompletionEvent)event).getReport();
-
-      taskAttempt.fillTaskStatistics(report);
-      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(),
-          TaskEventType.T_ATTEMPT_SUCCEEDED));
-    }
-  }
-
-  private static class FailedTransition
-      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
-    @Override
-    public void transition(QueryUnitAttempt taskAttempt,
-                           TaskAttemptEvent event) {
-      TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
-      taskAttempt.eventHandler.handle(
-          new TaskTAttemptEvent(taskAttempt.getId(),
-              TaskEventType.T_ATTEMPT_FAILED));
-      LOG.error("FROM " + taskAttempt.getHost() + " >> "
-          + errorEvent.errorMessage());
-      taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
-    }
-  }
-
-  @Override
-  public void handle(TaskAttemptEvent event) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing " + event.getTaskAttemptId() + " of type "
-          + event.getType());
-    }
-    try {
-      writeLock.lock();
-      TaskAttemptState oldState = getState();
-      try {
-        stateMachine.doTransition(event.getType(), event);
-      } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state of "
-            + event.getTaskAttemptId() + ")", e);
-        eventHandler.handle(new QueryEvent(getId().getQueryId(),
-            QueryEventType.INTERNAL_ERROR));
-      }
-
-      //notify the eventhandler of state change
-      if (LOG.isDebugEnabled()) {
-       if (oldState != getState()) {
-          LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to "
-              + getState());
-        }
-      }
-    }
-
-    finally {
-      writeLock.unlock();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
deleted file mode 100644
index 622c4f1..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/Repartitioner.java
+++ /dev/null
@@ -1,582 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import tajo.QueryIdFactory;
-import tajo.SubQueryId;
-import tajo.catalog.*;
-import tajo.catalog.proto.CatalogProtos.StoreType;
-import tajo.catalog.statistics.TableStat;
-import tajo.conf.TajoConf.ConfVars;
-import tajo.engine.planner.PlannerUtil;
-import tajo.engine.planner.RangePartitionAlgorithm;
-import tajo.engine.planner.UniformRangePartition;
-import tajo.engine.planner.logical.*;
-import tajo.engine.utils.TupleUtil;
-import tajo.exception.InternalException;
-import tajo.master.ExecutionBlock.PartitionType;
-import tajo.master.QueryUnit.IntermediateEntry;
-import tajo.storage.Fragment;
-import tajo.storage.TupleRange;
-import tajo.util.TUtil;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.math.BigDecimal;
-import java.net.URI;
-import java.util.*;
-import java.util.Map.Entry;
-
-/**
- * Repartitioner creates non-leaf tasks and shuffles intermediate data.
- * It supports two repartition methods, such as hash and range repartition.
- */
-public class Repartitioner {
-  private static final Log LOG = LogFactory.getLog(Repartitioner.class);
-
-  private static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
-
-  public static QueryUnit [] createJoinTasks(SubQuery subQuery)
-      throws IOException {
-    ExecutionBlock execBlock = subQuery.getBlock();
-    CatalogService catalog = subQuery.getContext().getCatalog();
-
-    ScanNode[] scans = execBlock.getScanNodes();
-    Path tablePath;
-    Fragment [] fragments = new Fragment[2];
-    TableStat [] stats = new TableStat[2];
-
-    // initialize variables from the child operators
-    for (int i =0; i < 2; i++) {
-      // TODO - temporarily tables should be stored in temporarily catalog for each query
-      if (scans[i].getTableId().startsWith(SubQueryId.PREFIX)) {
-        tablePath = subQuery.getStorageManager().getTablePath(scans[i].getTableId());
-      } else {
-        tablePath = catalog.getTableDesc(scans[i].getTableId()).getPath();
-      }
-
-      if (scans[i].isLocal()) { // it only requires a dummy fragment.
-        fragments[i] = new Fragment(scans[i].getTableId(), tablePath,
-            CatalogUtil.newTableMeta(scans[i].getInSchema(), StoreType.CSV),
-            0, 0, null);
-      } else {
-        fragments[i] = subQuery.getStorageManager().getSplits(scans[i].getTableId(),
-            catalog.getTableDesc(scans[i].getTableId()).getMeta(),
-            new Path(tablePath, "data")).get(0);
-      }
-
-      // Getting a table stat for each scan
-      stats[i] = subQuery.getChildQuery(scans[i]).getTableStat();
-    }
-
-    // Assigning either fragments or fetch urls to query units
-    QueryUnit [] tasks;
-    if (scans[0].isBroadcast() || scans[1].isBroadcast()) {
-      tasks = new QueryUnit[1];
-      tasks[0] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), 0),
-          false, subQuery.getEventHandler());
-      tasks[0].setLogicalPlan(execBlock.getPlan());
-      tasks[0].setFragment(scans[0].getTableId(), fragments[0]);
-      tasks[0].setFragment(scans[1].getTableId(), fragments[1]);
-    } else {
-      // The hash map is modeling as follows:
-      // <Partition Id, <Table Name, Intermediate Data>>
-      Map<Integer, Map<String, List<IntermediateEntry>>> hashEntries =
-          new HashMap<Integer, Map<String, List<IntermediateEntry>>>();
-
-      // Grouping IntermediateData by a partition key and a table name
-      for (ScanNode scan : scans) {
-        SubQuery childSubQuery = subQuery.getChildQuery(scan);
-        for (QueryUnit task : childSubQuery.getQueryUnits()) {
-          if (task.getIntermediateData() != null) {
-            for (IntermediateEntry intermEntry : task.getIntermediateData()) {
-              if (hashEntries.containsKey(intermEntry.getPartitionId())) {
-                Map<String, List<IntermediateEntry>> tbNameToInterm =
-                    hashEntries.get(intermEntry.getPartitionId());
-
-                if (tbNameToInterm.containsKey(scan.getTableId())) {
-                  tbNameToInterm.get(scan.getTableId()).add(intermEntry);
-                } else {
-                  tbNameToInterm.put(scan.getTableId(), TUtil.newList(intermEntry));
-                }
-              } else {
-                Map<String, List<IntermediateEntry>> tbNameToInterm =
-                    new HashMap<String, List<IntermediateEntry>>();
-                tbNameToInterm.put(scan.getTableId(), TUtil.newList(intermEntry));
-                hashEntries.put(intermEntry.getPartitionId(), tbNameToInterm);
-              }
-            }
-          }
-        }
-      }
-
-      LOG.info("Outer Intermediate Volume: " + stats[0].getNumBytes());
-      LOG.info("Inner Intermediate Volume: " + stats[1].getNumBytes());
-
-      // Getting the desire number of join tasks according to the volumn
-      // of a larger table
-      int largerIdx = stats[0].getNumBytes() >= stats[1].getNumBytes() ? 0 : 1;
-      int desireJoinTaskVolumn = subQuery.getContext().getConf().
-          getIntVar(ConfVars.JOIN_TASK_VOLUME);
-
-      // calculate the number of tasks according to the data size
-      int mb = (int) Math.ceil((double)stats[largerIdx].getNumBytes() / 1048576);
-      LOG.info("Larger intermediate data is approximately " + mb + " MB");
-      // determine the number of task per 64MB
-      int maxTaskNum = (int) Math.ceil((double)mb / desireJoinTaskVolumn);
-      LOG.info("The calculated number of tasks is " + maxTaskNum);
-      LOG.info("The number of total partition keys is " + hashEntries.size());
-      // the number of join tasks cannot be larger than the number of
-      // distinct partition ids.
-      int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
-      LOG.info("The determined number of join tasks is " + joinTaskNum);
-      QueryUnit [] createdTasks = newEmptyJoinTask(subQuery, fragments, joinTaskNum);
-
-      // Assign partitions to tasks in a round robin manner.
-      int i = 0;
-      for (Entry<Integer, Map<String, List<IntermediateEntry>>> entry
-          : hashEntries.entrySet()) {
-        addJoinPartition(createdTasks[i++], subQuery, entry.getKey(), entry.getValue());
-        if (i >= joinTaskNum) {
-          i = 0;
-        }
-      }
-
-      List<QueryUnit> filteredTasks = new ArrayList<QueryUnit>();
-      for (QueryUnit task : createdTasks) {
-        // if there are at least two fetches, the join is possible.
-        if (task.getFetches().size() > 1) {
-          filteredTasks.add(task);
-        }
-      }
-
-      tasks = filteredTasks.toArray(new QueryUnit[filteredTasks.size()]);
-    }
-
-    return tasks;
-  }
-
-  private static QueryUnit [] newEmptyJoinTask(SubQuery subQuery, Fragment [] fragments, int taskNum) {
-    ExecutionBlock execBlock = subQuery.getBlock();
-    QueryUnit [] tasks = new QueryUnit[taskNum];
-    for (int i = 0; i < taskNum; i++) {
-      tasks[i] = new QueryUnit(
-          QueryIdFactory.newQueryUnitId(subQuery.getId(), i), execBlock.isLeafBlock(),
-          subQuery.getEventHandler());
-      tasks[i].setLogicalPlan(execBlock.getPlan());
-      for (Fragment fragment : fragments) {
-        tasks[i].setFragment2(fragment);
-      }
-    }
-
-    return tasks;
-  }
-
-  private static void addJoinPartition(QueryUnit task, SubQuery subQuery, int partitionId,
-                                       Map<String, List<IntermediateEntry>> grouppedPartitions) {
-
-    for (ScanNode scanNode : subQuery.getBlock().getScanNodes()) {
-      Map<String, List<IntermediateEntry>> requests;
-      if (grouppedPartitions.containsKey(scanNode.getTableId())) {
-          requests = mergeHashPartitionRequest(grouppedPartitions.get(scanNode.getTableId()));
-      } else {
-        return;
-      }
-      Set<URI> fetchURIs = TUtil.newHashSet();
-      for (Entry<String, List<IntermediateEntry>> requestPerNode : requests.entrySet()) {
-        Collection<URI> uris = createHashFetchURL(requestPerNode.getKey(),
-            subQuery.getChildQuery(scanNode).getId(),
-            partitionId, PartitionType.HASH,
-            requestPerNode.getValue());
-        fetchURIs.addAll(uris);
-      }
-      task.addFetches(scanNode.getTableId(), fetchURIs);
-    }
-  }
-
-  /**
-   * This method merges the partition request associated with the pullserver's address.
-   * It reduces the number of TCP connections.
-   *
-   * @return key: pullserver's address, value: a list of requests
-   */
-  private static Map<String, List<IntermediateEntry>> mergeHashPartitionRequest(
-      List<IntermediateEntry> partitions) {
-    Map<String, List<IntermediateEntry>> mergedPartitions =
-        new HashMap<String, List<IntermediateEntry>>();
-    for (IntermediateEntry partition : partitions) {
-      if (mergedPartitions.containsKey(partition.getPullAddress())) {
-        mergedPartitions.get(partition.getPullAddress()).add(partition);
-      } else {
-        mergedPartitions.put(partition.getPullAddress(), TUtil.newList(partition));
-      }
-    }
-
-    return mergedPartitions;
-  }
-
-  public static QueryUnit [] createNonLeafTask(SubQuery subQuery,
-                                               SubQuery childSubQuery,
-                                               int maxNum)
-      throws InternalException {
-    ExecutionBlock childExecBlock = childSubQuery.getBlock();
-    if (childExecBlock.getPartitionType() == PartitionType.HASH) {
-      return createHashPartitionedTasks(subQuery, childSubQuery, maxNum);
-    } else if (childExecBlock.getPartitionType() == PartitionType.RANGE) {
-      return createRangePartitionedTasks(subQuery, childSubQuery, maxNum);
-    } else {
-      throw new InternalException("Cannot support partition type");
-    }
-  }
-
-  public static QueryUnit [] createRangePartitionedTasks(SubQuery subQuery,
-                                                         SubQuery childSubQuery,
-                                                         int maxNum)
-      throws InternalException {
-    ExecutionBlock execBlock = subQuery.getBlock();
-    TableStat stat = childSubQuery.getTableStat();
-    if (stat.getNumRows() == 0) {
-      return new QueryUnit[0];
-    }
-
-    ScanNode scan = execBlock.getScanNodes()[0];
-    Path tablePath;
-    tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableId());
-
-    StoreTableNode store = (StoreTableNode) childSubQuery.getBlock().getPlan();
-    SortNode sort = (SortNode) store.getSubNode();
-    SortSpec[] sortSpecs = sort.getSortKeys();
-    Schema sortSchema = PlannerUtil.sortSpecsToSchema(sort.getSortKeys());
-
-    // calculate the number of maximum query ranges
-    TupleRange mergedRange =
-        TupleUtil.columnStatToRange(sort.getOutSchema(),
-            sortSchema, stat.getColumnStats());
-    RangePartitionAlgorithm partitioner =
-        new UniformRangePartition(sortSchema, mergedRange);
-    BigDecimal card = partitioner.getTotalCardinality();
-
-    // if the number of the range cardinality is less than the desired number of tasks,
-    // we set the the number of tasks to the number of range cardinality.
-    int determinedTaskNum;
-    if (card.compareTo(new BigDecimal(maxNum)) < 0) {
-      LOG.info("The range cardinality (" + card
-          + ") is less then the desired number of tasks (" + maxNum + ")");
-      determinedTaskNum = card.intValue();
-    } else {
-      determinedTaskNum = maxNum;
-    }
-
-    LOG.info("Try to divide " + mergedRange + " into " + determinedTaskNum +
-        " sub ranges (total units: " + determinedTaskNum + ")");
-    TupleRange [] ranges = partitioner.partition(determinedTaskNum);
-
-    Fragment dummyFragment = new Fragment(scan.getTableId(), tablePath,
-        CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),
-        0, 0, null);
-
-    List<String> basicFetchURIs = new ArrayList<String>();
-
-    SubQuery child = childSubQuery.getContext().getSubQuery(
-        subQuery.getBlock().getChildBlock(scan).getId());
-    for (QueryUnit qu : child.getQueryUnits()) {
-      for (IntermediateEntry p : qu.getIntermediateData()) {
-        String uri = createBasicFetchUri(p.getPullHost(), p.getPullPort(),
-            childSubQuery.getId(), p.taskId, p.attemptId);
-        basicFetchURIs.add(uri);
-      }
-    }
-
-    boolean ascendingFirstKey = sortSpecs[0].isAscending();
-    SortedMap<TupleRange, Set<URI>> map;
-    if (ascendingFirstKey) {
-      map = new TreeMap<TupleRange, Set<URI>>();
-    } else {
-      map = new TreeMap<TupleRange, Set<URI>>(new TupleRange.DescendingTupleRangeComparator());
-    }
-
-    Set<URI> uris;
-    try {
-      for (int i = 0; i < ranges.length; i++) {
-        uris = new HashSet<URI>();
-        for (String uri: basicFetchURIs) {
-          String rangeParam = TupleUtil.rangeToQuery(sortSchema, ranges[i],
-              ascendingFirstKey, ascendingFirstKey ? i == (ranges.length - 1) : i == 0);
-          URI finalUri = URI.create(uri + "&" + rangeParam);
-          uris.add(finalUri);
-        }
-        map.put(ranges[i], uris);
-      }
-
-    } catch (UnsupportedEncodingException e) {
-      LOG.error(e);
-    }
-
-    QueryUnit [] tasks = createEmptyNonLeafTasks(subQuery, determinedTaskNum, dummyFragment);
-    assignPartitionByRoundRobin(map, scan.getTableId(), tasks);
-    return tasks;
-  }
-
-  public static QueryUnit [] assignPartitionByRoundRobin(Map<?, Set<URI>> partitions,
-                                               String tableName, QueryUnit [] tasks) {
-    int tid = 0;
-    for (Entry<?, Set<URI>> entry : partitions.entrySet()) {
-      for (URI uri : entry.getValue()) {
-        tasks[tid].addFetch(tableName, uri);
-      }
-
-      if (tid >= tasks.length) {
-        tid = 0;
-      } else {
-        tid ++;
-      }
-    }
-
-    return tasks;
-  }
-
-  public static String createBasicFetchUri(String hostName, int port,
-                                           SubQueryId childSid,
-                                           int taskId, int attemptId) {
-    String scheme = "http://";
-    StringBuilder sb = new StringBuilder(scheme);
-    sb.append(hostName).append(":").append(port)
-        .append("/?").append("sid=").append(childSid.getId())
-        .append("&").append("ta=").append(taskId).append("_").append(attemptId)
-        .append("&").append("p=0")
-        .append("&").append("type=r");
-
-    return sb.toString();
-  }
-
-  public static QueryUnit [] createHashPartitionedTasks(SubQuery subQuery,
-                                                 SubQuery childSubQuery,
-                                                 int maxNum) {
-    ExecutionBlock execBlock = subQuery.getBlock();
-    TableStat stat = childSubQuery.getTableStat();
-    if (stat.getNumRows() == 0) {
-      return new QueryUnit[0];
-    }
-
-    ScanNode scan = execBlock.getScanNodes()[0];
-    Path tablePath;
-    tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableId());
-
-    List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
-    for (QueryUnit tasks : childSubQuery.getQueryUnits()) {
-      if (tasks.getIntermediateData() != null) {
-        partitions.addAll(tasks.getIntermediateData());
-      }
-    }
-
-    Fragment frag = new Fragment(scan.getTableId(), tablePath,
-        CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),
-        0, 0, null);
-
-    Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);
-    Map<String, List<IntermediateEntry>> hashedByHost;
-    Map<Integer, List<URI>> finalFetchURI = new HashMap<Integer, List<URI>>();
-
-    for (Entry<Integer, List<IntermediateEntry>> interm : hashed.entrySet()) {
-      hashedByHost = hashByHost(interm.getValue());
-      for (Entry<String, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
-        Collection<URI> uris = createHashFetchURL(e.getKey(), childSubQuery.getId(),
-            interm.getKey(),
-            childSubQuery.getBlock().getPartitionType(), e.getValue());
-
-        if (finalFetchURI.containsKey(interm.getKey())) {
-          finalFetchURI.get(interm.getKey()).addAll(uris);
-        } else {
-          finalFetchURI.put(interm.getKey(), TUtil.newList(uris));
-        }
-      }
-    }
-
-    GroupbyNode groupby = (GroupbyNode) childSubQuery.getBlock().getStoreTableNode().
-        getSubNode();
-    // the number of tasks cannot exceed the number of merged fetch uris.
-    int determinedTaskNum = Math.min(maxNum, finalFetchURI.size());
-    if (groupby.getGroupingColumns().length == 0) {
-      determinedTaskNum = 1;
-    }
-
-    QueryUnit [] tasks = createEmptyNonLeafTasks(subQuery, determinedTaskNum, frag);
-
-    int tid = 0;
-    for (Entry<Integer, List<URI>> entry : finalFetchURI.entrySet()) {
-      for (URI uri : entry.getValue()) {
-        tasks[tid].addFetch(scan.getTableId(), uri);
-      }
-
-      tid ++;
-
-      if (tid == tasks.length) {
-       tid = 0;
-      }
-    }
-
-    return tasks;
-  }
-
-  public static Collection<URI> createHashFetchURL(String hostAndPort, SubQueryId childSid,
-                                       int partitionId, PartitionType type,
-                                       List<IntermediateEntry> entries) {
-    String scheme = "http://";
-    StringBuilder urlPrefix = new StringBuilder(scheme);
-    urlPrefix.append(hostAndPort)
-        .append("/?").append("sid=").append(childSid.getId())
-        .append("&").append("p=").append(partitionId)
-        .append("&").append("type=");
-    if (type == PartitionType.HASH) {
-      urlPrefix.append("h");
-    } else if (type == PartitionType.RANGE) {
-      urlPrefix.append("r");
-    }
-    urlPrefix.append("&ta=");
-
-    // If the get request is longer than 2000 characters,
-    // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
-    // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
-    // The below code transforms a long request to multiple requests.
-    List<String> taskIdsParams = new ArrayList<String>();
-    boolean first = true;
-    StringBuilder taskIdListBuilder = new StringBuilder();
-    for (IntermediateEntry entry: entries) {
-      StringBuilder taskAttemptId = new StringBuilder();
-
-      if (!first) { // when comma is added?
-        taskAttemptId.append(",");
-      } else {
-        first = false;
-      }
-
-      taskAttemptId.append(entry.getTaskId()).append("_").
-          append(entry.getAttemptId());
-      if (taskIdListBuilder.length() + taskAttemptId.length()
-          > HTTP_REQUEST_MAXIMUM_LENGTH) {
-        taskIdsParams.add(taskIdListBuilder.toString());
-        taskIdListBuilder = new StringBuilder(entry.getTaskId() + "_" + entry.getAttemptId());
-      } else {
-        taskIdListBuilder.append(taskAttemptId);
-      }
-    }
-
-    // if the url params remain
-    if (taskIdListBuilder.length() > 0) {
-      taskIdsParams.add(taskIdListBuilder.toString());
-    }
-
-    Collection<URI> fetchURLs = new ArrayList<URI>();
-    for (String param : taskIdsParams) {
-      fetchURLs.add(URI.create(urlPrefix + param));
-    }
-
-    return fetchURLs;
-  }
-
-  public static Map<Integer, List<IntermediateEntry>> hashByKey(
-      List<IntermediateEntry> entries) {
-    Map<Integer, List<IntermediateEntry>> hashed = new HashMap<Integer, List<IntermediateEntry>>();
-    for (IntermediateEntry entry : entries) {
-      if (hashed.containsKey(entry.getPartitionId())) {
-        hashed.get(entry.getPartitionId()).add(entry);
-      } else {
-        hashed.put(entry.getPartitionId(), TUtil.newList(entry));
-      }
-    }
-
-    return hashed;
-  }
-
-  public static QueryUnit [] createEmptyNonLeafTasks(SubQuery subQuery, int num,
-                                                     Fragment frag) {
-    LogicalNode plan = subQuery.getBlock().getPlan();
-    QueryUnit [] tasks = new QueryUnit[num];
-    for (int i = 0; i < num; i++) {
-      tasks[i] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), i),
-          false, subQuery.getEventHandler());
-      tasks[i].setFragment2(frag);
-      tasks[i].setLogicalPlan(plan);
-    }
-    return tasks;
-  }
-
-  public static Map<String, List<IntermediateEntry>> hashByHost(
-      List<IntermediateEntry> entries) {
-    Map<String, List<IntermediateEntry>> hashed = new HashMap<String, List<IntermediateEntry>>();
-
-    String hostName;
-    for (IntermediateEntry entry : entries) {
-      hostName = entry.getPullHost() + ":" + entry.getPullPort();
-      if (hashed.containsKey(hostName)) {
-        hashed.get(hostName).add(entry);
-      } else {
-        hashed.put(hostName, TUtil.newList(entry));
-      }
-    }
-
-    return hashed;
-  }
-
-  public static SubQuery setPartitionNumberForTwoPhase(SubQuery subQuery, final int n) {
-    ExecutionBlock execBlock = subQuery.getBlock();
-    Column[] keys = null;
-    // if the next query is join,
-    // set the partition number for the current logicalUnit
-    // TODO: the union handling is required when a join has unions as its child
-    ExecutionBlock parentBlock = execBlock.getParentBlock();
-    if (parentBlock != null) {
-      if (parentBlock.getStoreTableNode().getSubNode().getType() == ExprType.JOIN) {
-        execBlock.getStoreTableNode().setPartitions(execBlock.getPartitionType(),
-            execBlock.getStoreTableNode().getPartitionKeys(), n);
-        keys = execBlock.getStoreTableNode().getPartitionKeys();
-      }
-    }
-
-    StoreTableNode store = execBlock.getStoreTableNode();
-    // set the partition number for group by and sort
-    if (execBlock.getPartitionType() == PartitionType.HASH) {
-      if (store.getSubNode().getType() == ExprType.GROUP_BY) {
-        GroupbyNode groupby = (GroupbyNode)store.getSubNode();
-        keys = groupby.getGroupingColumns();
-      }
-    } else if (execBlock.getPartitionType() == PartitionType.RANGE) {
-      if (store.getSubNode().getType() == ExprType.SORT) {
-        SortNode sort = (SortNode)store.getSubNode();
-        keys = new Column[sort.getSortKeys().length];
-        for (int i = 0; i < keys.length; i++) {
-          keys[i] = sort.getSortKeys()[i].getSortKey();
-        }
-      }
-    }
-    if (keys != null) {
-      if (keys.length == 0) {
-        store.setPartitions(execBlock.getPartitionType(), new Column[]{}, 1);
-      } else {
-        store.setPartitions(execBlock.getPartitionType(), keys, n);
-      }
-    } else {
-      store.setListPartition();
-    }
-    return subQuery;
-  }
-}


Mime
View raw message