tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [11/51] [partial] TAJO-22: The package prefix should be org.apache.tajo. (DaeMyung Kang via hyunsik)
Date Tue, 02 Jul 2013 14:16:05 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/utils/TupleUtil.java b/tajo-core/tajo-core-backend/src/main/java/tajo/engine/utils/TupleUtil.java
deleted file mode 100644
index 8b95816..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/engine/utils/TupleUtil.java
+++ /dev/null
@@ -1,388 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.engine.utils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import tajo.catalog.Column;
-import tajo.catalog.Schema;
-import tajo.catalog.SortSpec;
-import tajo.catalog.statistics.ColumnStat;
-import tajo.datum.Datum;
-import tajo.datum.DatumFactory;
-import tajo.engine.planner.PlannerUtil;
-import tajo.storage.RowStoreUtil;
-import tajo.storage.Tuple;
-import tajo.storage.TupleRange;
-import tajo.storage.VTuple;
-import tajo.worker.dataserver.HttpUtil;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.util.List;
-import java.util.Map;
-
-public class TupleUtil {
-  /** class logger **/
-  private static final Log LOG = LogFactory.getLog(TupleUtil.class);
-
-  /**
-   * It computes the value cardinality of a tuple range.
-   *
-   * @param schema
-   * @param range
-   * @return
-   */
-  public static long computeCardinality(Schema schema, TupleRange range) {
-    Tuple start = range.getStart();
-    Tuple end = range.getEnd();
-    Column col;
-
-    long cardinality = 1;
-    long columnCard;
-    for (int i = 0; i < schema.getColumnNum(); i++) {
-      col = schema.getColumn(i);
-      switch (col.getDataType().getType()) {
-        case CHAR:
-          columnCard = end.get(i).asChar() - start.get(i).asChar();
-          break;
-        case BIT:
-          columnCard = end.get(i).asByte() - start.get(i).asByte();
-          break;
-        case INT2:
-          columnCard = end.get(i).asInt2() - start.get(i).asInt2();
-          break;
-        case INT4:
-          columnCard = end.get(i).asInt4() - start.get(i).asInt4();
-          break;
-        case INT8:
-          columnCard = end.get(i).asInt8() - start.get(i).asInt8();
-          break;
-        case FLOAT4:
-          columnCard = end.get(i).asInt4() - start.get(i).asInt4();
-          break;
-        case FLOAT8:
-          columnCard = end.get(i).asInt8() - start.get(i).asInt8();
-          break;
-        case TEXT:
-          columnCard = end.get(i).asChars().charAt(0) - start.get(i).asChars().charAt(0);
-          break;
-        default:
-          throw new UnsupportedOperationException(col.getDataType() + " is not supported yet");
-      }
-
-      if (columnCard > 0) {
-        cardinality *= columnCard + 1;
-      }
-    }
-
-    return cardinality;
-  }
-
-  public static TupleRange [] getPartitions(Schema schema, int partNum, TupleRange range) {
-    Tuple start = range.getStart();
-    Tuple end = range.getEnd();
-    Column col;
-    TupleRange [] partitioned = new TupleRange[partNum];
-
-    Datum[] term = new Datum[schema.getColumnNum()];
-    Datum[] prevValues = new Datum[schema.getColumnNum()];
-
-    // initialize term and previous values
-    for (int i = 0; i < schema.getColumnNum(); i++) {
-      col = schema.getColumn(i);
-      prevValues[i] = start.get(i);
-      switch (col.getDataType().getType()) {
-        case CHAR:
-          int sChar = start.get(i).asChar();
-          int eChar = end.get(i).asChar();
-          int rangeChar;
-          if ((eChar - sChar) > partNum) {
-            rangeChar = (eChar - sChar) / partNum;
-          } else {
-            rangeChar = 1;
-          }
-          term[i] = DatumFactory.createInt4(rangeChar);
-        case BIT:
-          byte sByte = start.get(i).asByte();
-          byte eByte = end.get(i).asByte();
-          int rangeByte;
-          if ((eByte - sByte) > partNum) {
-            rangeByte = (eByte - sByte) / partNum;
-          } else {
-            rangeByte = 1;
-          }
-          term[i] = DatumFactory.createBit((byte) rangeByte);
-          break;
-
-        case INT2:
-          short sShort = start.get(i).asInt2();
-          short eShort = end.get(i).asInt2();
-          int rangeShort;
-          if ((eShort - sShort) > partNum) {
-            rangeShort = (eShort - sShort) / partNum;
-          } else {
-            rangeShort = 1;
-          }
-          term[i] = DatumFactory.createInt2((short) rangeShort);
-          break;
-
-        case INT4:
-          int sInt = start.get(i).asInt4();
-          int eInt = end.get(i).asInt4();
-          int rangeInt;
-          if ((eInt - sInt) > partNum) {
-            rangeInt = (eInt - sInt) / partNum;
-          } else {
-            rangeInt = 1;
-          }
-          term[i] = DatumFactory.createInt4(rangeInt);
-          break;
-
-        case INT8:
-          long sLong = start.get(i).asInt8();
-          long eLong = end.get(i).asInt8();
-          long rangeLong;
-          if ((eLong - sLong) > partNum) {
-            rangeLong = ((eLong - sLong) / partNum);
-          } else {
-            rangeLong = 1;
-          }
-          term[i] = DatumFactory.createInt8(rangeLong);
-          break;
-
-        case FLOAT4:
-          float sFloat = start.get(i).asFloat4();
-          float eFloat = end.get(i).asFloat4();
-          float rangeFloat;
-          if ((eFloat - sFloat) > partNum) {
-            rangeFloat = ((eFloat - sFloat) / partNum);
-          } else {
-            rangeFloat = 1;
-          }
-          term[i] = DatumFactory.createFloat4(rangeFloat);
-          break;
-        case FLOAT8:
-          double sDouble = start.get(i).asFloat8();
-          double eDouble = end.get(i).asFloat8();
-          double rangeDouble;
-          if ((eDouble - sDouble) > partNum) {
-            rangeDouble = ((eDouble - sDouble) / partNum);
-          } else {
-            rangeDouble = 1;
-          }
-          term[i] = DatumFactory.createFloat8(rangeDouble);
-          break;
-        case TEXT:
-          char sChars = start.get(i).asChars().charAt(0);
-          char eChars = end.get(i).asChars().charAt(0);
-          int rangeString;
-          if ((eChars - sChars) > partNum) {
-            rangeString = ((eChars - sChars) / partNum);
-          } else {
-            rangeString = 1;
-          }
-          term[i] = DatumFactory.createText(((char) rangeString) + "");
-          break;
-        case INET4:
-          throw new UnsupportedOperationException();
-        case BLOB:
-          throw new UnsupportedOperationException();
-        default:
-          throw new UnsupportedOperationException();
-      }
-    }
-
-    for (int p = 0; p < partNum; p++) {
-      Tuple sTuple = new VTuple(schema.getColumnNum());
-      Tuple eTuple = new VTuple(schema.getColumnNum());
-      for (int i = 0; i < schema.getColumnNum(); i++) {
-        col = schema.getColumn(i);
-        sTuple.put(i, prevValues[i]);
-        switch (col.getDataType().getType()) {
-          case CHAR:
-            char endChar = (char) (prevValues[i].asChar() + term[i].asChar());
-            if (endChar > end.get(i).asByte()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              eTuple.put(i, DatumFactory.createChar(endChar));
-            }
-            prevValues[i] = DatumFactory.createChar(endChar);
-            break;
-          case BIT:
-            byte endByte = (byte) (prevValues[i].asByte() + term[i].asByte());
-            if (endByte > end.get(i).asByte()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              eTuple.put(i, DatumFactory.createBit(endByte));
-            }
-            prevValues[i] = DatumFactory.createBit(endByte);
-            break;
-          case INT2:
-            int endShort = (short) (prevValues[i].asInt2() + term[i].asInt2());
-            if (endShort > end.get(i).asInt2()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createInt2((short) endShort));
-            }
-            prevValues[i] = DatumFactory.createInt2((short) endShort);
-            break;
-          case INT4:
-            int endInt = (prevValues[i].asInt4() + term[i].asInt4());
-            if (endInt > end.get(i).asInt4()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createInt4(endInt));
-            }
-            prevValues[i] = DatumFactory.createInt4(endInt);
-            break;
-
-          case INT8:
-            long endLong = (prevValues[i].asInt8() + term[i].asInt8());
-            if (endLong > end.get(i).asInt8()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createInt8(endLong));
-            }
-            prevValues[i] = DatumFactory.createInt8(endLong);
-            break;
-
-          case FLOAT4:
-            float endFloat = (prevValues[i].asFloat4() + term[i].asFloat4());
-            if (endFloat > end.get(i).asFloat4()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createFloat4(endFloat));
-            }
-            prevValues[i] = DatumFactory.createFloat4(endFloat);
-            break;
-          case FLOAT8:
-            double endDouble = (prevValues[i].asFloat8() + term[i].asFloat8());
-            if (endDouble > end.get(i).asFloat8()) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createFloat8(endDouble));
-            }
-            prevValues[i] = DatumFactory.createFloat8(endDouble);
-            break;
-          case TEXT:
-            String endString = ((char)(prevValues[i].asChars().charAt(0) + term[i].asChars().charAt(0))) + "";
-            if (endString.charAt(0) > end.get(i).asChars().charAt(0)) {
-              eTuple.put(i, end.get(i));
-            } else {
-              // TODO - to consider overflow
-              eTuple.put(i, DatumFactory.createText(endString));
-            }
-            prevValues[i] = DatumFactory.createText(endString);
-            break;
-          case INET4:
-            throw new UnsupportedOperationException();
-          case BLOB:
-            throw new UnsupportedOperationException();
-          default:
-            throw new UnsupportedOperationException();
-        }
-      }
-      partitioned[p] = new TupleRange(schema, sTuple, eTuple);
-    }
-
-    return partitioned;
-  }
-
-  public static String rangeToQuery(Schema schema, TupleRange range,
-                                    boolean ascendingFirstKey, boolean last)
-      throws UnsupportedEncodingException {
-    StringBuilder sb = new StringBuilder();
-    byte [] firstKeyBytes = RowStoreUtil.RowStoreEncoder
-        .toBytes(schema, range.getStart());
-    byte [] endKeyBytes = RowStoreUtil.RowStoreEncoder
-        .toBytes(schema, range.getEnd());
-
-    String firstKeyBase64 = new String(Base64.encodeBase64(firstKeyBytes));
-    String lastKeyBase64 = new String(Base64.encodeBase64(endKeyBytes));
-
-    sb.append("start=")
-        .append(URLEncoder.encode(firstKeyBase64, "utf-8"))
-        .append("&")
-        .append("end=")
-        .append(URLEncoder.encode(lastKeyBase64, "utf-8"));
-
-    if (last) {
-      sb.append("&final=true");
-    }
-
-    return sb.toString();
-  }
-
-  public static String [] rangesToQueries(final SortSpec[] sortSpec,
-                                          final TupleRange[] ranges)
-      throws UnsupportedEncodingException {
-    Schema schema = PlannerUtil.sortSpecsToSchema(sortSpec);
-    boolean ascendingFirstKey = sortSpec[0].isAscending();
-    String [] params = new String[ranges.length];
-    for (int i = 0; i < ranges.length; i++) {
-      params[i] =
-        rangeToQuery(schema, ranges[i], ascendingFirstKey,
-            ascendingFirstKey ? i == (ranges.length - 1) : i == 0);
-    }
-    return params;
-  }
-
-  public static TupleRange queryToRange(Schema schema, String query) throws UnsupportedEncodingException {
-    Map<String,String> params = HttpUtil.getParamsFromQuery(query);
-    String startUrlDecoded = URLDecoder.decode(params.get("start"), "utf-8");
-    String endUrlDecoded = URLDecoder.decode(params.get("end"), "utf-8");
-    byte [] startBytes = Base64.decodeBase64(startUrlDecoded);
-    byte [] endBytes = Base64.decodeBase64(endUrlDecoded);
-    return new TupleRange(schema, RowStoreUtil.RowStoreDecoder
-        .toTuple(schema, startBytes), RowStoreUtil.RowStoreDecoder
-        .toTuple(schema, endBytes));
-  }
-
-  public static TupleRange columnStatToRange(Schema schema, Schema target, List<ColumnStat> colStats) {
-    Map<Column, ColumnStat> statSet = Maps.newHashMap();
-    for (ColumnStat stat : colStats) {
-      statSet.put(stat.getColumn(), stat);
-    }
-
-    for (Column col : target.getColumns()) {
-      Preconditions.checkState(statSet.containsKey(col),
-          "ERROR: Invalid Column Stats (column stats: " + colStats + ", there exists not target " + col);
-    }
-
-    Tuple startTuple = new VTuple(target.getColumnNum());
-    Tuple endTuple = new VTuple(target.getColumnNum());
-    int i = 0;
-    for (Column col : target.getColumns()) {
-      startTuple.put(i, statSet.get(col).getMinValue());
-      endTuple.put(i, statSet.get(col).getMaxValue());
-      i++;
-    }
-    return new TupleRange(target, startTuple, endTuple);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/ipc/protocolrecords/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/ipc/protocolrecords/QueryUnitRequest.java b/tajo-core/tajo-core-backend/src/main/java/tajo/ipc/protocolrecords/QueryUnitRequest.java
deleted file mode 100644
index 6c75186..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/ipc/protocolrecords/QueryUnitRequest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package tajo.ipc.protocolrecords;
-
-import tajo.QueryUnitAttemptId;
-import tajo.common.ProtoObject;
-import tajo.engine.MasterWorkerProtos.Fetch;
-import tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
-import tajo.storage.Fragment;
-
-import java.net.URI;
-import java.util.List;
-
-public interface QueryUnitRequest extends ProtoObject<QueryUnitRequestProto> {
-
-	public QueryUnitAttemptId getId();
-	public List<Fragment> getFragments();
-	public String getOutputTableId();
-	public boolean isClusteredOutput();
-	public String getSerializedData();
-	public boolean isInterQuery();
-	public void setInterQuery();
-	public void addFetch(String name, URI uri);
-	public List<Fetch> getFetches();
-  public boolean shouldDie();
-  public void setShouldDie();
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/ClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/ClientService.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/ClientService.java
deleted file mode 100644
index 296dfef..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/ClientService.java
+++ /dev/null
@@ -1,433 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
-import org.apache.hadoop.yarn.service.AbstractService;
-import tajo.QueryId;
-import tajo.TajoProtos;
-import tajo.catalog.*;
-import tajo.catalog.exception.AlreadyExistsTableException;
-import tajo.catalog.exception.NoSuchTableException;
-import tajo.catalog.proto.CatalogProtos.TableDescProto;
-import tajo.catalog.statistics.TableStat;
-import tajo.client.ClientProtocol;
-import tajo.client.ClientProtocol.*;
-import tajo.conf.TajoConf;
-import tajo.conf.TajoConf.ConfVars;
-import tajo.master.TajoMaster.MasterContext;
-import tajo.master.event.QueryEvent;
-import tajo.master.event.QueryEventType;
-import tajo.rpc.ProtoBlockingRpcServer;
-import tajo.rpc.RemoteException;
-import tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-import tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
-import tajo.storage.StorageUtil;
-import tajo.util.TajoIdUtils;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-
-public class ClientService extends AbstractService {
-  private final static Log LOG = LogFactory.getLog(ClientService.class);
-  private final MasterContext context;
-  private final TajoConf conf;
-  private final CatalogService catalog;
-  private final ClientProtocolHandler clientHandler;
-  private ProtoBlockingRpcServer server;
-  private InetSocketAddress bindAddress;
-
-  private final BoolProto BOOL_TRUE =
-      BoolProto.newBuilder().setValue(true).build();
-  private final BoolProto BOOL_FALSE =
-      BoolProto.newBuilder().setValue(false).build();
-
-  public ClientService(MasterContext context) {
-    super(ClientService.class.getName());
-    this.context = context;
-    this.conf = context.getConf();
-    this.catalog = context.getCatalog();
-    this.clientHandler = new ClientProtocolHandler();
-  }
-
-  @Override
-  public void start() {
-
-    // start the rpc server
-    String confClientServiceAddr = conf.getVar(ConfVars.CLIENT_SERVICE_ADDRESS);
-    InetSocketAddress initIsa = NetUtils.createSocketAddr(confClientServiceAddr);
-    try {
-      server = new ProtoBlockingRpcServer(ClientProtocol.class, clientHandler, initIsa);
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-    server.start();
-    bindAddress = server.getBindAddress();
-    this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS,
-        tajo.util.NetUtils.getIpPortString(bindAddress));
-    LOG.info("Instantiated ClientService at " + this.bindAddress);
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    server.shutdown();
-    super.stop();
-  }
-
-  public InetSocketAddress getBindAddress() {
-    return this.bindAddress;
-  }
-
-  public int getHttpPort() {
-    return 0;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // ClientService
-  /////////////////////////////////////////////////////////////////////////////
-
-  public class ClientProtocolHandler implements ClientProtocolService.BlockingInterface {
-    @Override
-    public BoolProto updateSessionVariables(RpcController controller,
-                                            UpdateSessionVariableRequest request)
-        throws ServiceException {
-      return null;
-    }
-
-    @Override
-    public SubmitQueryRespose submitQuery(RpcController controller,
-                                           QueryRequest request)
-        throws ServiceException {
-
-      QueryId queryId;
-
-      try {
-        queryId = context.getGlobalEngine().executeQuery(request.getQuery());
-      } catch (Exception e) {
-        SubmitQueryRespose.Builder build = SubmitQueryRespose.newBuilder();
-        build.setResultCode(ResultCode.ERROR);
-        if (e.getMessage() != null) {
-          build.setErrorMessage(e.getMessage());
-        } else {
-          build.setErrorMessage("Internal Error");
-        }
-        return build.build();
-      }
-
-      LOG.info("Query " + queryId + " is submitted");
-      SubmitQueryRespose.Builder build = SubmitQueryRespose.newBuilder();
-      build.setResultCode(ResultCode.OK);
-      build.setQueryId(queryId.getProto());
-
-      return build.build();
-    }
-
-    @Override
-    public UpdateQueryResponse updateQuery(RpcController controller,
-                                           QueryRequest request)
-        throws ServiceException {
-
-      UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder();
-      try {
-        context.getGlobalEngine().updateQuery(request.getQuery());
-        builder.setResultCode(ResultCode.OK);
-        return builder.build();
-      } catch (Exception e) {
-        builder.setResultCode(ResultCode.ERROR);
-        builder.setErrorMessage(e.getMessage());
-        return builder.build();
-      }
-    }
-
-    @Override
-    public GetQueryResultResponse getQueryResult(RpcController controller,
-                                                 GetQueryResultRequest request)
-        throws ServiceException {
-      QueryId queryId = new QueryId(request.getQueryId());
-      if (queryId.equals(TajoIdUtils.NullQueryId)) {
-
-      }
-      Query query = context.getQuery(queryId).getContext().getQuery();
-
-      GetQueryResultResponse.Builder builder
-          = GetQueryResultResponse.newBuilder();
-      switch (query.getState()) {
-        case QUERY_SUCCEEDED:
-          builder.setTableDesc((TableDescProto) query.getResultDesc().getProto());
-          break;
-        case QUERY_FAILED:
-        case QUERY_ERROR:
-          builder.setErrorMessage("Query " + queryId + " is failed");
-        default:
-          builder.setErrorMessage("Query " + queryId + " is still running");
-      }
-
-      return builder.build();
-    }
-
-    @Override
-    public GetQueryListResponse getQueryList(RpcController controller,
-                                             GetQueryListRequest request)
-        throws ServiceException {
-      return null;
-    }
-
-    @Override
-    public GetQueryStatusResponse getQueryStatus(RpcController controller,
-                                                 GetQueryStatusRequest request)
-        throws ServiceException {
-
-      GetQueryStatusResponse.Builder builder
-          = GetQueryStatusResponse.newBuilder();
-      QueryId queryId = new QueryId(request.getQueryId());
-      builder.setQueryId(request.getQueryId());
-
-      if (queryId.equals(TajoIdUtils.NullQueryId)) {
-        builder.setResultCode(ResultCode.OK);
-        builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
-      } else {
-        Query query = context.getQuery(queryId).getContext().getQuery();
-        if (query != null) {
-          builder.setResultCode(ResultCode.OK);
-          builder.setState(query.getState());
-          builder.setProgress(query.getProgress());
-          builder.setSubmitTime(query.getAppSubmitTime());
-          builder.setInitTime(query.getInitializationTime());
-          builder.setHasResult(!query.isCreateTableStmt());
-          if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
-            builder.setFinishTime(query.getFinishTime());
-          } else {
-            builder.setFinishTime(System.currentTimeMillis());
-          }
-        } else {
-          builder.setResultCode(ResultCode.ERROR);
-          builder.setErrorMessage("No such query: " + queryId.toString());
-        }
-      }
-
-      return builder.build();
-    }
-
-    @Override
-    public BoolProto killQuery(RpcController controller,
-                               ApplicationAttemptIdProto request)
-        throws ServiceException {
-      QueryId queryId = new QueryId(request);
-      QueryMaster query = context.getQuery(queryId);
-      query.handle(new QueryEvent(queryId, QueryEventType.KILL));
-
-      return BOOL_TRUE;
-    }
-
-    @Override
-    public GetClusterInfoResponse getClusterInfo(RpcController controller,
-                                                 GetClusterInfoRequest request)
-        throws ServiceException {
-      return null;
-    }
-
-    @Override
-    public BoolProto existTable(RpcController controller,
-                                StringProto tableNameProto)
-        throws ServiceException {
-      String tableName = tableNameProto.getValue();
-      if (catalog.existsTable(tableName)) {
-        return BOOL_TRUE;
-      } else {
-        return BOOL_FALSE;
-      }
-    }
-
-    @Override
-    public GetTableListResponse getTableList(RpcController controller,
-                                             GetTableListRequest request)
-        throws ServiceException {
-      Collection<String> tableNames = catalog.getAllTableNames();
-      GetTableListResponse.Builder builder = GetTableListResponse.newBuilder();
-      builder.addAllTables(tableNames);
-      return builder.build();
-    }
-
-    @Override
-    public TableResponse getTableDesc(RpcController controller,
-                                      GetTableDescRequest request)
-        throws ServiceException {
-      String name = request.getTableName();
-      if (catalog.existsTable(name)) {
-        return TableResponse.newBuilder()
-            .setTableDesc((TableDescProto) catalog.getTableDesc(name).getProto())
-            .build();
-      } else {
-        return null;
-      }
-    }
-
-    @Override
-    public TableResponse createTable(RpcController controller,
-                                     CreateTableRequest request)
-        throws ServiceException {
-      if (catalog.existsTable(request.getName())) {
-        throw new AlreadyExistsTableException(request.getName());
-      }
-
-      Path path = new Path(request.getPath());
-      LOG.info(path.toUri());
-
-      long totalSize = 0;
-      try {
-        totalSize = calculateSize(new Path(path, "data"));
-      } catch (IOException e) {
-        LOG.error("Cannot calculate the size of the relation", e);
-      }
-
-      TableMeta meta = new TableMetaImpl(request.getMeta());
-      TableStat stat = new TableStat();
-      stat.setNumBytes(totalSize);
-      meta.setStat(stat);
-
-      TableDesc desc = new TableDescImpl(request.getName(),meta, path);
-      try {
-        StorageUtil.writeTableMeta(conf, path, desc.getMeta());
-      } catch (IOException e) {
-        LOG.error("Cannot write the table meta file", e);
-      }
-      catalog.addTable(desc);
-      LOG.info("Table " + desc.getId() + " is created (" + meta.getStat().getNumBytes() + ")");
-
-      return TableResponse.newBuilder().
-          setTableDesc((TableDescProto) desc.getProto())
-          .build();
-    }
-
-    @Override
-    public BoolProto dropTable(RpcController controller,
-                               StringProto tableNameProto)
-        throws ServiceException {
-      String tableName = tableNameProto.getValue();
-      if (!catalog.existsTable(tableName)) {
-        throw new NoSuchTableException(tableName);
-      }
-
-      Path path = catalog.getTableDesc(tableName).getPath();
-      catalog.deleteTable(tableName);
-      try {
-        context.getStorageManager().delete(path);
-      } catch (IOException e) {
-        throw new RemoteException(e);
-      }
-      LOG.info("Table is dropped" + tableName);
-      return BOOL_TRUE;
-    }
-
-    @Override
-    public TableResponse attachTable(RpcController controller,
-                                     AttachTableRequest request)
-        throws ServiceException {
-
-      TableDesc desc;
-      if (catalog.existsTable(request.getName())) {
-        throw new AlreadyExistsTableException(request.getName());
-      }
-
-      Path path = new Path(request.getPath());
-
-      LOG.info(path.toUri());
-
-      TableMeta meta;
-      try {
-        meta = TableUtil.getTableMeta(conf, path);
-      } catch (IOException e) {
-        throw new RemoteException(e);
-      }
-
-      FileSystem fs;
-
-      // for legacy table structure
-      Path tablePath = new Path(path, "data");
-      try {
-        fs = path.getFileSystem(conf);
-        if (!fs.exists(tablePath)) {
-          tablePath = path;
-        }
-      } catch (IOException e) {
-        LOG.error(e);
-        return null;
-      }
-
-      if (meta.getStat() == null) {
-        long totalSize = 0;
-        try {
-          totalSize = calculateSize(tablePath);
-        } catch (IOException e) {
-          LOG.error("Cannot calculate the size of the relation", e);
-          return null;
-        }
-
-        meta = new TableMetaImpl(meta.getProto());
-        TableStat stat = new TableStat();
-        stat.setNumBytes(totalSize);
-        meta.setStat(stat);
-      }
-
-      desc = new TableDescImpl(request.getName(), meta, path);
-      catalog.addTable(desc);
-      LOG.info("Table " + desc.getId() + " is attached ("
-          + meta.getStat().getNumBytes() + ")");
-
-      return TableResponse.newBuilder().
-          setTableDesc((TableDescProto) desc.getProto())
-          .build();
-    }
-
-    @Override
-    public BoolProto detachTable(RpcController controller,
-                                 StringProto tableNameProto)
-        throws ServiceException {
-      String tableName = tableNameProto.getValue();
-      if (!catalog.existsTable(tableName)) {
-        throw new NoSuchTableException(tableName);
-      }
-
-      catalog.deleteTable(tableName);
-
-      LOG.info("Table " + tableName + " is detached");
-      return BOOL_TRUE;
-    }
-  }
-
-  private long calculateSize(Path path) throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
-    long totalSize = 0;
-    for (FileStatus status : fs.listStatus(path)) {
-      totalSize += status.getLen();
-    }
-
-    return totalSize;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlock.java
deleted file mode 100644
index 04d3bc7..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlock.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import com.google.common.base.Preconditions;
-import tajo.SubQueryId;
-import tajo.catalog.Schema;
-import tajo.engine.planner.logical.*;
-
-import java.util.*;
-
-/**
- * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
- * An ExecutionBlock is a basic execution unit that could be distributed across a number of nodes.
- * An ExecutionBlock class contains input information (e.g., child execution blocks or input
- * tables), and output information (e.g., partition type, partition key, and partition number).
- * In addition, it includes a logical plan to be executed in each node.
- */
-public class ExecutionBlock {
-
-  public static enum PartitionType {
-    /** for hash partitioning */
-    HASH,
-    LIST,
-    /** for map-side join */
-    BROADCAST,
-    /** for range partitioning */
-    RANGE
-  }
-
-  private SubQueryId subQueryId;
-  private LogicalNode plan = null;
-  private StoreTableNode store = null;
-  private List<ScanNode> scanlist = new ArrayList<ScanNode>();
-  private ExecutionBlock parent;
-  private Map<ScanNode, ExecutionBlock> childSubQueries = new HashMap<ScanNode, ExecutionBlock>();
-  private PartitionType outputType;
-  private boolean hasJoinPlan;
-  private boolean hasUnionPlan;
-
-  public ExecutionBlock(SubQueryId subQueryId) {
-    this.subQueryId = subQueryId;
-  }
-
-  public SubQueryId getId() {
-    return subQueryId;
-  }
-
-  public String getOutputName() {
-    return store.getTableName();
-  }
-
-  public void setPartitionType(PartitionType partitionType) {
-    this.outputType = partitionType;
-  }
-
-  public PartitionType getPartitionType() {
-    return outputType;
-  }
-
-  public void setPlan(LogicalNode plan) {
-    hasJoinPlan = false;
-    Preconditions.checkArgument(plan.getType() == ExprType.STORE
-        || plan.getType() == ExprType.CREATE_INDEX);
-
-    this.plan = plan;
-    if (plan instanceof StoreTableNode) {
-      store = (StoreTableNode) plan;
-    } else {
-      store = (StoreTableNode) ((IndexWriteNode)plan).getSubNode();
-    }
-
-    LogicalNode node = plan;
-    ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
-    s.add(node);
-    while (!s.isEmpty()) {
-      node = s.remove(s.size()-1);
-      if (node instanceof UnaryNode) {
-        UnaryNode unary = (UnaryNode) node;
-        s.add(s.size(), unary.getSubNode());
-      } else if (node instanceof BinaryNode) {
-        BinaryNode binary = (BinaryNode) node;
-        if (binary.getType() == ExprType.JOIN) {
-          hasJoinPlan = true;
-        } else if (binary.getType() == ExprType.UNION) {
-          hasUnionPlan = true;
-        }
-        s.add(s.size(), binary.getOuterNode());
-        s.add(s.size(), binary.getInnerNode());
-      } else if (node instanceof ScanNode) {
-        scanlist.add((ScanNode)node);
-      }
-    }
-  }
-
-
-  public LogicalNode getPlan() {
-    return plan;
-  }
-
-  public boolean hasParentBlock() {
-    return parent != null;
-  }
-
-  public ExecutionBlock getParentBlock() {
-    return parent;
-  }
-
-  public void setParentBlock(ExecutionBlock parent) {
-    this.parent = parent;
-  }
-
-  public boolean hasChildBlock() {
-    return childSubQueries.size() > 0;
-  }
-
-  public ExecutionBlock getChildBlock(ScanNode scanNode) {
-    return childSubQueries.get(scanNode);
-  }
-
-  public Collection<ExecutionBlock> getChildBlocks() {
-    return Collections.unmodifiableCollection(childSubQueries.values());
-  }
-
-  public Map<ScanNode, ExecutionBlock> getChildBlockMap() {
-    return childSubQueries;
-  }
-
-  public void addChildBlock(ScanNode scanNode, ExecutionBlock child) {
-    childSubQueries.put(scanNode, child);
-  }
-
-  public int getChildNum() {
-    return childSubQueries.size();
-  }
-
-  public void removeChildBlock(ScanNode scanNode) {
-    scanlist.remove(scanNode);
-    this.childSubQueries.remove(scanNode);
-  }
-
-  public void addChildBlocks(Map<ScanNode, ExecutionBlock> childBlocks) {
-    childSubQueries.putAll(childBlocks);
-  }
-
-  public boolean isLeafBlock() {
-    return childSubQueries.size() == 0;
-  }
-
-  public StoreTableNode getStoreTableNode() {
-    return store;
-  }
-
-  public ScanNode [] getScanNodes() {
-    return this.scanlist.toArray(new ScanNode[scanlist.size()]);
-  }
-
-  public Schema getOutputSchema() {
-    return store.getOutSchema();
-  }
-
-  public boolean hasJoin() {
-    return hasJoinPlan;
-  }
-
-  public boolean hasUnion() {
-    return hasUnionPlan;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlockCursor.java
deleted file mode 100644
index 89e2a5b..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/ExecutionBlockCursor.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import tajo.engine.planner.global.MasterPlan;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-
-/**
- * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
- * This class is a pointer to an ExecutionBlock that the query engine should execute.
- * For each call of nextBlock(), it retrieves a next ExecutionBlock in a postfix order.
- */
-public class ExecutionBlockCursor {
-  private ArrayList<ExecutionBlock> orderedBlocks = new ArrayList<ExecutionBlock>();
-  private int cursor = 0;
-
-  public ExecutionBlockCursor(MasterPlan plan) {
-    buildOrder(plan.getRoot());
-  }
-
-  private void buildOrder(ExecutionBlock current) {
-    if (current.hasChildBlock()) {
-      if (current.getChildNum() == 1) {
-        ExecutionBlock block = current.getChildBlocks().iterator().next();
-        buildOrder(block);
-      } else {
-        Iterator<ExecutionBlock> it = current.getChildBlocks().iterator();
-        ExecutionBlock outer = it.next();
-        ExecutionBlock inner = it.next();
-
-        // Switch between outer and inner
-        // if an inner has a child and an outer doesn't.
-        // It is for left-deep-first search.
-        if (!outer.hasChildBlock() && inner.hasChildBlock()) {
-          ExecutionBlock tmp = outer;
-          outer = inner;
-          inner = tmp;
-        }
-
-        buildOrder(outer);
-        buildOrder(inner);
-      }
-    }
-    orderedBlocks.add(current);
-  }
-
-  public boolean hasNext() {
-    return cursor < orderedBlocks.size();
-  }
-
-  public ExecutionBlock nextBlock() {
-    return orderedBlocks.get(cursor++);
-  }
-
-  public ExecutionBlock peek() {
-    return orderedBlocks.get(cursor);
-  }
-
-  public ExecutionBlock peek(int skip) {
-    return  orderedBlocks.get(cursor + skip);
-  }
-
-  public void reset() {
-    cursor = 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalEngine.java
deleted file mode 100644
index 2a52ef7..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalEngine.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.Records;
-import tajo.QueryConf;
-import tajo.QueryId;
-import tajo.catalog.CatalogUtil;
-import tajo.catalog.TableDesc;
-import tajo.catalog.TableMeta;
-import tajo.catalog.statistics.TableStat;
-import tajo.engine.exception.EmptyClusterException;
-import tajo.engine.exception.IllegalQueryStatusException;
-import tajo.engine.exception.NoSuchQueryIdException;
-import tajo.engine.exception.UnknownWorkerException;
-import tajo.engine.parser.QueryAnalyzer;
-import tajo.engine.parser.StatementType;
-import tajo.engine.planner.LogicalOptimizer;
-import tajo.engine.planner.LogicalPlanner;
-import tajo.engine.planner.PlanningContext;
-import tajo.engine.planner.global.GlobalOptimizer;
-import tajo.engine.planner.global.MasterPlan;
-import tajo.engine.planner.logical.CreateTableNode;
-import tajo.engine.planner.logical.ExprType;
-import tajo.engine.planner.logical.LogicalNode;
-import tajo.engine.planner.logical.LogicalRootNode;
-import tajo.engine.query.exception.TQLSyntaxError;
-import tajo.master.TajoMaster.MasterContext;
-import tajo.storage.StorageManager;
-import tajo.storage.StorageUtil;
-import tajo.util.TajoIdUtils;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.EnumSet;
-import java.util.Set;
-
-@SuppressWarnings("unchecked")
-public class GlobalEngine extends AbstractService {
-  /** Class Logger */
-  private final static Log LOG = LogFactory.getLog(GlobalEngine.class);
-
-  private final MasterContext context;
-  private final StorageManager sm;
-
-  private QueryAnalyzer analyzer;
-  private LogicalPlanner planner;
-  private GlobalPlanner globalPlanner;
-  private GlobalOptimizer globalOptimizer;
-
-  // Yarn
-  private final RecordFactory recordFactory =
-      RecordFactoryProvider.getRecordFactory(null);
-  protected YarnClient yarnClient;
-  protected InetSocketAddress rmAddress;
-
-  public GlobalEngine(final MasterContext context, final StorageManager sm)
-      throws IOException {
-    super(GlobalEngine.class.getName());
-    this.context = context;
-    this.sm = sm;
-  }
-
-  public void start() {
-    try  {
-      connectYarnClient();
-      analyzer = new QueryAnalyzer(context.getCatalog());
-      planner = new LogicalPlanner(context.getCatalog());
-
-      globalPlanner = new GlobalPlanner(context.getConf(), context.getCatalog(),
-          sm, context.getEventHandler());
-
-      globalOptimizer = new GlobalOptimizer();
-    } catch (Throwable t) {
-      t.printStackTrace();
-    }
-    super.start();
-  }
-
-  public void stop() {
-    super.stop();
-    yarnClient.stop();
-  }
-
-  private String createTable(LogicalRootNode root) throws IOException {
-    // create table queries are executed by the master
-    CreateTableNode createTable = (CreateTableNode) root.getSubNode();
-    TableMeta meta;
-    if (createTable.hasOptions()) {
-      meta = CatalogUtil.newTableMeta(createTable.getSchema(),
-          createTable.getStorageType(), createTable.getOptions());
-    } else {
-      meta = CatalogUtil.newTableMeta(createTable.getSchema(),
-          createTable.getStorageType());
-    }
-
-    FileSystem fs = createTable.getPath().getFileSystem(context.getConf());
-    if(fs.exists(createTable.getPath()) && fs.isFile(createTable.getPath())) {
-    	throw new IOException("ERROR: LOCATION must be a directory.");
-    }
-
-    long totalSize = 0;
-    try {
-      totalSize = sm.calculateSize(createTable.getPath());
-    } catch (IOException e) {
-      LOG.error("Cannot calculate the size of the relation", e);
-    }
-    TableStat stat = new TableStat();
-    stat.setNumBytes(totalSize);
-    meta.setStat(stat);
-
-    StorageUtil.writeTableMeta(context.getConf(), createTable.getPath(), meta);
-    TableDesc desc = CatalogUtil.newTableDesc(createTable.getTableName(), meta,
-        createTable.getPath());
-    context.getCatalog().addTable(desc);
-    return desc.getId();
-  }
-  
-  public QueryId executeQuery(String tql)
-      throws InterruptedException, IOException,
-      NoSuchQueryIdException, IllegalQueryStatusException,
-      UnknownWorkerException, EmptyClusterException {
-    long querySubmittionTime = context.getClock().getTime();
-    LOG.info("TQL: " + tql);
-    // parse the query
-    PlanningContext planningContext = analyzer.parse(tql);
-    LogicalRootNode plan = (LogicalRootNode) createLogicalPlan(planningContext);
-
-    if (plan.getSubNode().getType() == ExprType.CREATE_TABLE) {
-      createTable(plan);
-
-      return TajoIdUtils.NullQueryId;
-    } else {
-      ApplicationAttemptId appAttemptId = submitQuery();
-      QueryId queryId = TajoIdUtils.createQueryId(appAttemptId);
-      MasterPlan masterPlan = createGlobalPlan(queryId, plan);
-      QueryConf queryConf = new QueryConf(context.getConf());
-      queryConf.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
-
-      // the output table is given by user
-      if (planningContext.hasExplicitOutputTable()) {
-        queryConf.setOutputTable(planningContext.getExplicitOutputTable());
-      }
-      /*
-        Path warehousePath = new Path(queryConf.getVar(ConfVars.WAREHOUSE_PATH));
-        Path outputDir = new Path(warehousePath, planningContext.getExplicitOutputTable());
-        queryConf.setOutputDir(outputDir);
-      } else {
-        Path queryTmpPath = new Path(queryConf.getVar(ConfVars.QUERY_TMP_DIR));
-        Path outputDir = new Path(queryTmpPath, queryId.toString());
-        queryConf.setOutputDir(outputDir);
-      } */
-
-      QueryMaster query = new QueryMaster(context, appAttemptId,
-          context.getClock(), querySubmittionTime, masterPlan);
-      startQuery(queryId, queryConf, query);
-
-      return queryId;
-    }
-  }
-
-  private ApplicationAttemptId submitQuery() throws YarnRemoteException {
-    GetNewApplicationResponse newApp = getNewApplication();
-    // Get a new application id
-    ApplicationId appId = newApp.getApplicationId();
-    System.out.println("Get AppId: " + appId);
-    LOG.info("Setting up application submission context for ASM");
-    ApplicationSubmissionContext appContext = Records
-        .newRecord(ApplicationSubmissionContext.class);
-
-    // set the application id
-    appContext.setApplicationId(appId);
-    // set the application name
-    appContext.setApplicationName("Tajo");
-
-    // Set the priority for the application master
-    org.apache.hadoop.yarn.api.records.Priority
-        pri = Records.newRecord(org.apache.hadoop.yarn.api.records.Priority.class);
-    pri.setPriority(5);
-    appContext.setPriority(pri);
-
-    // Set the queue to which this application is to be submitted in the RM
-    appContext.setQueue("default");
-
-    // Set up the container launch context for the application master
-    ContainerLaunchContext amContainer = Records
-        .newRecord(ContainerLaunchContext.class);
-    appContext.setAMContainerSpec(amContainer);
-
-    // unmanaged AM
-    appContext.setUnmanagedAM(true);
-    LOG.info("Setting unmanaged AM");
-
-    // Submit the application to the applications manager
-    LOG.info("Submitting application to ASM");
-    yarnClient.submitApplication(appContext);
-
-    // Monitor the application to wait for launch state
-    ApplicationReport appReport = monitorApplication(appId,
-        EnumSet.of(YarnApplicationState.ACCEPTED));
-    ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
-    LOG.info("Launching application with id: " + attemptId);
-
-    return attemptId;
-  }
-
-  private LogicalNode createLogicalPlan(PlanningContext planningContext)
-      throws IOException {
-
-    LogicalNode plan = planner.createPlan(planningContext);
-    plan = LogicalOptimizer.optimize(planningContext, plan);
-    LogicalNode optimizedPlan = LogicalOptimizer.pushIndex(plan, sm);
-    LOG.info("LogicalPlan:\n" + plan);
-
-    return optimizedPlan;
-  }
-
-  private MasterPlan createGlobalPlan(QueryId id, LogicalRootNode rootNode)
-      throws IOException {
-    MasterPlan globalPlan = globalPlanner.build(id, rootNode);
-    return globalOptimizer.optimize(globalPlan);
-  }
-
-  private void startQuery(final QueryId queryId, final QueryConf queryConf,
-                          final QueryMaster query) {
-    context.getAllQueries().put(queryId, query);
-    query.init(queryConf);
-    query.start();
-  }
-
-  public boolean updateQuery(String tql) throws IOException {
-    LOG.info("TQL: " + tql);
-
-    PlanningContext planningContext = analyzer.parse(tql);
-    if (planningContext.getParseTree().getStatementType()
-        == StatementType.CREATE_TABLE) {
-      LogicalRootNode plan = (LogicalRootNode) createLogicalPlan(planningContext);
-      createTable(plan);
-      return true;
-    } else {
-      throw new TQLSyntaxError(tql, "updateQuery cannot handle such query");
-    }
-  }
-
-  private void connectYarnClient() {
-    this.yarnClient = new YarnClientImpl();
-    this.yarnClient.init(getConfig());
-    this.yarnClient.start();
-  }
-
-  private static InetSocketAddress getRmAddress(Configuration conf) {
-    return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
-  }
-
-  public GetNewApplicationResponse getNewApplication()
-      throws YarnRemoteException {
-    return yarnClient.getNewApplication();
-  }
-
-  /**
-   * Monitor the submitted application for completion. Kill application if time
-   * expires.
-   *
-   * @param appId
-   *          Application Id of application to be monitored
-   * @return true if application completed successfully
-   * @throws YarnRemoteException
-   */
-  private ApplicationReport monitorApplication(ApplicationId appId,
-                                               Set<YarnApplicationState> finalState) throws YarnRemoteException {
-
-    while (true) {
-
-      // Check app status every 1 second.
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        LOG.debug("Thread sleep in monitoring loop interrupted");
-      }
-
-      // Get application report for the appId we are interested in
-      ApplicationReport report = yarnClient.getApplicationReport(appId);
-
-      LOG.info("Got application report from ASM for" + ", appId="
-          + appId.getId() + ", appAttemptId="
-          + report.getCurrentApplicationAttemptId() + ", clientToken="
-          + report.getClientToken() + ", appDiagnostics="
-          + report.getDiagnostics() + ", appMasterHost=" + report.getHost()
-          + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
-          + report.getRpcPort() + ", appStartTime=" + report.getStartTime()
-          + ", yarnAppState=" + report.getYarnApplicationState().toString()
-          + ", distributedFinalState="
-          + report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
-          + report.getTrackingUrl() + ", appUser=" + report.getUser());
-
-      YarnApplicationState state = report.getYarnApplicationState();
-      if (finalState.contains(state)) {
-        return report;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
deleted file mode 100644
index 2176b8e..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/GlobalPlanner.java
+++ /dev/null
@@ -1,721 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.event.EventHandler;
-import tajo.QueryId;
-import tajo.QueryIdFactory;
-import tajo.SubQueryId;
-import tajo.catalog.*;
-import tajo.catalog.proto.CatalogProtos.StoreType;
-import tajo.conf.TajoConf;
-import tajo.engine.parser.QueryBlock.FromTable;
-import tajo.engine.planner.PlannerUtil;
-import tajo.engine.planner.global.MasterPlan;
-import tajo.engine.planner.logical.*;
-import tajo.master.ExecutionBlock.PartitionType;
-import tajo.storage.StorageManager;
-import tajo.util.TajoIdUtils;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class GlobalPlanner {
-  private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
-
-  private TajoConf conf;
-  private StorageManager sm;
-  private CatalogService catalog;
-  private QueryId queryId;
-  private EventHandler eventHandler;
-
-  public GlobalPlanner(final TajoConf conf, final CatalogService catalog,
-                       final StorageManager sm,
-                       final EventHandler eventHandler)
-      throws IOException {
-    this.conf = conf;
-    this.sm = sm;
-    this.catalog = catalog;
-    this.eventHandler = eventHandler;
-  }
-
-  /**
-   * Builds a master plan from the given logical plan.
-   * @param queryId
-   * @param rootNode
-   * @return
-   * @throws IOException
-   */
-  public MasterPlan build(QueryId queryId, LogicalRootNode rootNode)
-      throws IOException {
-    this.queryId = queryId;
-
-    String outputTableName = null;
-    if (rootNode.getSubNode().getType() == ExprType.STORE) {
-      // create table queries are executed by the master
-      StoreTableNode storeTableNode = (StoreTableNode) rootNode.getSubNode();
-      outputTableName = storeTableNode.getTableName();
-    }
-
-    // insert store at the subnode of the root
-    UnaryNode root = rootNode;
-    IndexWriteNode indexNode = null;
-    // TODO: check whether the type of the subnode is CREATE_INDEX
-    if (root.getSubNode().getType() == ExprType.CREATE_INDEX) {
-      indexNode = (IndexWriteNode) root.getSubNode();
-      root = (UnaryNode)root.getSubNode();
-      
-      StoreIndexNode store = new StoreIndexNode(
-          QueryIdFactory.newSubQueryId(this.queryId).toString());
-      store.setLocal(false);
-      PlannerUtil.insertNode(root, store);
-      
-    } else if (root.getSubNode().getType() != ExprType.STORE) {
-      SubQueryId subQueryId = QueryIdFactory.newSubQueryId(this.queryId);
-      outputTableName = subQueryId.toString();
-      insertStore(subQueryId.toString(),root).setLocal(false);
-    }
-    
-    // convert 2-phase plan
-    LogicalNode tp = convertTo2Phase(rootNode);
-
-    // make query graph
-    MasterPlan globalPlan = convertToGlobalPlan(indexNode, tp);
-    globalPlan.setOutputTableName(outputTableName);
-
-    return globalPlan;
-  }
-  
-  private StoreTableNode insertStore(String tableId, LogicalNode parent) {
-    StoreTableNode store = new StoreTableNode(tableId);
-    store.setLocal(true);
-    PlannerUtil.insertNode(parent, store);
-    return store;
-  }
-  
-  /**
-   * Transforms a logical plan to a two-phase plan. 
-   * Store nodes are inserted for every logical nodes except store and scan nodes
-   */
-  private class GlobalPlanBuilder implements LogicalNodeVisitor {
-    @Override
-    public void visit(LogicalNode node) {
-      String tableId;
-      StoreTableNode store;
-      if (node.getType() == ExprType.GROUP_BY) {
-        // transform group by to two-phase plan 
-        GroupbyNode groupby = (GroupbyNode) node;
-        // insert a store for the child of first group by
-        if (groupby.getSubNode().getType() != ExprType.UNION &&
-            groupby.getSubNode().getType() != ExprType.STORE &&
-            groupby.getSubNode().getType() != ExprType.SCAN) {
-          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
-          insertStore(tableId, groupby);
-        }
-        tableId = QueryIdFactory.newSubQueryId(queryId).toString();
-        // insert (a store for the first group by) and (a second group by)
-        PlannerUtil.transformGroupbyTo2PWithStore((GroupbyNode)node, tableId);
-      } else if (node.getType() == ExprType.SORT) {
-        // transform sort to two-phase plan 
-        SortNode sort = (SortNode) node;
-        // insert a store for the child of first sort
-        if (sort.getSubNode().getType() != ExprType.UNION &&
-            sort.getSubNode().getType() != ExprType.STORE &&
-            sort.getSubNode().getType() != ExprType.SCAN) {
-          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
-          insertStore(tableId, sort);
-        }
-        tableId = QueryIdFactory.newSubQueryId(queryId).toString();
-        // insert (a store for the first sort) and (a second sort)
-        PlannerUtil.transformSortTo2PWithStore((SortNode)node, tableId);
-      } else if (node.getType() == ExprType.JOIN) {
-        // transform join to two-phase plan 
-        // the first phase of two-phase join can be any logical nodes
-        JoinNode join = (JoinNode) node;
-
-        /*
-        if (join.getOuterNode().getType() == ExprType.SCAN &&
-            join.getInnerNode().getType() == ExprType.SCAN) {
-          ScanNode outerScan = (ScanNode) join.getOuterNode();
-          ScanNode innerScan = (ScanNode) join.getInnerNode();
-
-
-          TableMeta outerMeta =
-              catalog.getTableDesc(outerScan.getTableId()).getMeta();
-          TableMeta innerMeta =
-              catalog.getTableDesc(innerScan.getTableId()).getMeta();
-          long threshold = conf.getLongVar(ConfVars.BROADCAST_JOIN_THRESHOLD);
-
-
-          // if the broadcast join is available
-          boolean outerSmall = false;
-          boolean innerSmall = false;
-          if (!outerScan.isLocal() && outerMeta.getStat() != null &&
-              outerMeta.getStat().getNumBytes() <= threshold) {
-            outerSmall = true;
-            LOG.info("The relation (" + outerScan.getTableId() +
-                ") is less than " + threshold);
-          }
-          if (!innerScan.isLocal() && innerMeta.getStat() != null &&
-              innerMeta.getStat().getNumBytes() <= threshold) {
-            innerSmall = true;
-            LOG.info("The relation (" + innerScan.getTableId() +
-                ") is less than " + threshold);
-          }
-
-          if (outerSmall && innerSmall) {
-            if (outerMeta.getStat().getNumBytes() <=
-                innerMeta.getStat().getNumBytes()) {
-              outerScan.setBroadcast();
-              LOG.info("The relation " + outerScan.getTableId()
-                  + " is broadcasted");
-            } else {
-              innerScan.setBroadcast();
-              LOG.info("The relation " + innerScan.getTableId()
-                  + " is broadcasted");
-            }
-          } else {
-            if (outerSmall) {
-              outerScan.setBroadcast();
-              LOG.info("The relation (" + outerScan.getTableId()
-                  + ") is broadcasted");
-            } else if (innerSmall) {
-              innerScan.setBroadcast();
-              LOG.info("The relation (" + innerScan.getTableId()
-                  + ") is broadcasted");
-            }
-          }
-
-          if (outerScan.isBroadcast() || innerScan.isBroadcast()) {
-            return;
-          }
-        } */
-
-        // insert stores for the first phase
-        if (join.getOuterNode().getType() != ExprType.UNION &&
-            join.getOuterNode().getType() != ExprType.STORE) {
-          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
-          store = new StoreTableNode(tableId);
-          store.setLocal(true);
-          PlannerUtil.insertOuterNode(node, store);
-        }
-        if (join.getInnerNode().getType() != ExprType.UNION &&
-            join.getInnerNode().getType() != ExprType.STORE) {
-          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
-          store = new StoreTableNode(tableId);
-          store.setLocal(true);
-          PlannerUtil.insertInnerNode(node, store);
-        }
-      } else if (node.getType() == ExprType.UNION) {
-        // not two-phase transform
-        UnionNode union = (UnionNode) node;
-        // insert stores
-        if (union.getOuterNode().getType() != ExprType.UNION &&
-            union.getOuterNode().getType() != ExprType.STORE) {
-          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
-          store = new StoreTableNode(tableId);
-          if(union.getOuterNode().getType() == ExprType.GROUP_BY) {
-            /*This case is for cube by operator
-             * TODO : more complicated conidtion*/
-            store.setLocal(true);
-          } else {
-            /* This case is for union query*/
-            store.setLocal(false);
-          }
-          PlannerUtil.insertOuterNode(node, store);
-        }
-        if (union.getInnerNode().getType() != ExprType.UNION &&
-            union.getInnerNode().getType() != ExprType.STORE) {
-          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
-          store = new StoreTableNode(tableId);
-          if(union.getInnerNode().getType() == ExprType.GROUP_BY) {
-            /*This case is for cube by operator
-             * TODO : more complicated conidtion*/
-            store.setLocal(true);
-          }else {
-            /* This case is for union query*/
-            store.setLocal(false);
-          }
-          PlannerUtil.insertInnerNode(node, store);
-        }
-      } else if (node instanceof UnaryNode) {
-        UnaryNode unary = (UnaryNode)node;
-        if (unary.getType() != ExprType.STORE &&
-            unary.getSubNode().getType() != ExprType.STORE) {
-          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
-          insertStore(tableId, unary);
-        }
-      }
-    }
-  }
-
-  /**
-   * Convert the logical plan to a two-phase plan by the post-order traverse.
-   * 
-   * @param logicalPlan
-   * @return
-   */
-  private LogicalNode convertTo2Phase(LogicalNode logicalPlan) {
-    LogicalRootNode root = (LogicalRootNode) logicalPlan;
-    root.postOrder(new GlobalPlanBuilder());
-    return logicalPlan;
-  }
-  
-  private Map<StoreTableNode, ExecutionBlock> convertMap =
-      new HashMap<StoreTableNode, ExecutionBlock>();
-  
-  /**
-   * Logical plan을 후위 탐색하면서 SubQuery 생성
-   * 
-   * @param node 현재 방문 중인 노드
-   * @throws IOException
-   */
-  private void recursiveBuildSubQuery(LogicalNode node)
-      throws IOException {
-    ExecutionBlock subQuery;
-    StoreTableNode store;
-    if (node instanceof UnaryNode) {
-      recursiveBuildSubQuery(((UnaryNode) node).getSubNode());
-      
-      if (node.getType() == ExprType.STORE) {
-        store = (StoreTableNode) node;
-        SubQueryId id;
-        if (store.getTableName().startsWith(QueryId.PREFIX)) {
-          id = TajoIdUtils.newSubQueryId(store.getTableName());
-        } else {
-          id = QueryIdFactory.newSubQueryId(queryId);
-        }
-        subQuery = new ExecutionBlock(id);
-
-        switch (store.getSubNode().getType()) {
-        case BST_INDEX_SCAN:
-        case SCAN:  // store - scan
-          subQuery = makeScanSubQuery(subQuery);
-          subQuery.setPlan(node);
-          break;
-        case SELECTION:
-        case PROJECTION:
-        case LIMIT:
-          subQuery = makeUnarySubQuery(store, node, subQuery);
-          subQuery.setPlan(node);
-          break;
-        case GROUP_BY:
-          subQuery = makeGroupbySubQuery(store, node, subQuery);
-          subQuery.setPlan(node);
-          break;
-        case SORT:
-          subQuery = makeSortSubQuery(store, node, subQuery);
-          subQuery.setPlan(node);
-          break;
-        case JOIN:  // store - join
-          subQuery = makeJoinSubQuery(store, node, subQuery);
-          subQuery.setPlan(node);
-          break;
-        case UNION:
-          subQuery = makeUnionSubQuery(store, node, subQuery);
-          subQuery.setPlan(node);
-          break;
-        default:
-          subQuery = null;
-          break;
-        }
-
-        convertMap.put(store, subQuery);
-      }
-    } else if (node instanceof BinaryNode) {
-      recursiveBuildSubQuery(((BinaryNode) node).getOuterNode());
-      recursiveBuildSubQuery(((BinaryNode) node).getInnerNode());
-    } else if (node instanceof ScanNode) {
-
-    } else {
-
-    }
-  }
-  
-  private ExecutionBlock makeScanSubQuery(ExecutionBlock block) {
-    block.setPartitionType(PartitionType.LIST);
-    return block;
-  }
-  
-  /**
-   * Unifiable node(selection, projection)을 자식 플랜과 같은 SubQuery로 생성
-   * 
-   * @param rootStore 생성할 SubQuery의 store
-   * @param plan logical plan
-   * @param unit 생성할 SubQuery
-   * @return
-   * @throws IOException
-   */
-  private ExecutionBlock makeUnarySubQuery(StoreTableNode rootStore,
-                                     LogicalNode plan, ExecutionBlock unit) throws IOException {
-    ScanNode newScan;
-    ExecutionBlock prev;
-    UnaryNode unary = (UnaryNode) plan;
-    UnaryNode child = (UnaryNode) unary.getSubNode();
-    StoreTableNode prevStore = (StoreTableNode)child.getSubNode();
-
-    // add scan
-    newScan = GlobalPlannerUtils.newScanPlan(prevStore.getOutSchema(),
-        prevStore.getTableName(), sm.getTablePath(prevStore.getTableName()));
-    newScan.setLocal(true);
-    child.setSubNode(newScan);
-    prev = convertMap.get(prevStore);
-
-    if (prev != null) {
-      prev.setParentBlock(unit);
-      unit.addChildBlock(newScan, prev);
-      prev.setPartitionType(PartitionType.LIST);
-    }
-
-    unit.setPartitionType(PartitionType.LIST);
-
-    return unit;
-  }
-  
-  /**
-   * Two-phase SubQuery 생성.
-   * 
-   * @param rootStore 생성할 SubQuery의 store
-   * @param plan logical plan
-   * @param unit 생성할 SubQuery
-   * @return
-   * @throws IOException
-   */
-  private ExecutionBlock makeGroupbySubQuery(StoreTableNode rootStore,
-                                       LogicalNode plan, ExecutionBlock unit) throws IOException {
-    UnaryNode unary = (UnaryNode) plan;
-    UnaryNode unaryChild;
-    StoreTableNode prevStore;
-    ScanNode newScan;
-    ExecutionBlock prev;
-    unaryChild = (UnaryNode) unary.getSubNode();  // groupby
-    ExprType curType = unaryChild.getType();
-    if (unaryChild.getSubNode().getType() == ExprType.STORE) {
-      // store - groupby - store
-      unaryChild = (UnaryNode) unaryChild.getSubNode(); // store
-      prevStore = (StoreTableNode) unaryChild;
-      newScan = GlobalPlannerUtils.newScanPlan(prevStore.getOutSchema(),
-          prevStore.getTableName(),
-          sm.getTablePath(prevStore.getTableName()));
-      newScan.setLocal(true);
-      ((UnaryNode) unary.getSubNode()).setSubNode(newScan);
-      prev = convertMap.get(prevStore);
-      if (prev != null) {
-        prev.setParentBlock(unit);
-        unit.addChildBlock(newScan, prev);
-      }
-
-      if (unaryChild.getSubNode().getType() == curType) {
-        // the second phase
-        unit.setPartitionType(PartitionType.LIST);
-        if (prev != null) {
-          prev.setPartitionType(PartitionType.HASH);
-        }
-      } else {
-        // the first phase
-        unit.setPartitionType(PartitionType.HASH);
-        if (prev != null) {
-          prev.setPartitionType(PartitionType.LIST);
-        }
-      }
-    } else if (unaryChild.getSubNode().getType() == ExprType.SCAN) {
-      // the first phase
-      // store - groupby - scan
-      unit.setPartitionType(PartitionType.HASH);
-    } else if (unaryChild.getSubNode().getType() == ExprType.UNION) {
-      _handleUnionNode(rootStore, (UnionNode)unaryChild.getSubNode(), unit, 
-          null, PartitionType.LIST);
-    } else {
-      // error
-    }
-    return unit;
-  }
-  
-  /**
-   *
-   *
-   * @param rootStore 생성할 SubQuery의 store
-   * @param plan logical plan
-   * @param unit 생성할 SubQuery
-   * @return
-   * @throws IOException
-   */
-  private ExecutionBlock makeUnionSubQuery(StoreTableNode rootStore,
-                                     LogicalNode plan, ExecutionBlock unit) throws IOException {
-    UnaryNode unary = (UnaryNode) plan;
-    StoreTableNode outerStore, innerStore;
-    ExecutionBlock prev;
-    UnionNode union = (UnionNode) unary.getSubNode();
-    unit.setPartitionType(PartitionType.LIST);
-    
-    if (union.getOuterNode().getType() == ExprType.STORE) {
-      outerStore = (StoreTableNode) union.getOuterNode();
-      TableMeta outerMeta = CatalogUtil.newTableMeta(outerStore.getOutSchema(),
-          StoreType.CSV);
-      insertOuterScan(union, outerStore.getTableName(), outerMeta);
-      prev = convertMap.get(outerStore);
-      if (prev != null) {
-        prev.getStoreTableNode().setTableName(rootStore.getTableName());
-        prev.setPartitionType(PartitionType.LIST);
-        prev.setParentBlock(unit);
-        unit.addChildBlock((ScanNode) union.getOuterNode(), prev);
-      }
-    } else if (union.getOuterNode().getType() == ExprType.UNION) {
-      _handleUnionNode(rootStore, union, unit, null, PartitionType.LIST);
-    }
-    
-    if (union.getInnerNode().getType() == ExprType.STORE) {
-      innerStore = (StoreTableNode) union.getInnerNode();
-      TableMeta innerMeta = CatalogUtil.newTableMeta(innerStore.getOutSchema(),
-          StoreType.CSV);
-      insertInnerScan(union, innerStore.getTableName(), innerMeta);
-      prev = convertMap.get(innerStore);
-      if (prev != null) {
-        prev.getStoreTableNode().setTableName(rootStore.getTableName());
-        prev.setPartitionType(PartitionType.LIST);
-        prev.setParentBlock(unit);
-        unit.addChildBlock((ScanNode) union.getInnerNode(), prev);
-      }
-    } else if (union.getInnerNode().getType() == ExprType.UNION) {
-      _handleUnionNode(rootStore, union, unit, null, PartitionType.LIST);
-    }
-
-    return unit;
-  }
-
-  private ExecutionBlock makeSortSubQuery(StoreTableNode rootStore,
-                                    LogicalNode plan, ExecutionBlock unit) throws IOException {
-
-    UnaryNode unary = (UnaryNode) plan;
-    UnaryNode unaryChild;
-    StoreTableNode prevStore;
-    ScanNode newScan;
-    ExecutionBlock prev;
-    unaryChild = (UnaryNode) unary.getSubNode();  // groupby
-    ExprType curType = unaryChild.getType();
-    if (unaryChild.getSubNode().getType() == ExprType.STORE) {
-      // store - groupby - store
-      unaryChild = (UnaryNode) unaryChild.getSubNode(); // store
-      prevStore = (StoreTableNode) unaryChild;
-      newScan = GlobalPlannerUtils.newScanPlan(prevStore.getOutSchema(),
-          prevStore.getTableName(), sm.getTablePath(prevStore.getTableName()));
-      newScan.setLocal(true);
-      ((UnaryNode) unary.getSubNode()).setSubNode(newScan);
-      prev = convertMap.get(prevStore);
-      if (prev != null) {
-        prev.setParentBlock(unit);
-        unit.addChildBlock(newScan, prev);
-        if (unaryChild.getSubNode().getType() == curType) {
-          // TODO - this is duplicated code
-          prev.setPartitionType(PartitionType.RANGE);
-        } else {
-          prev.setPartitionType(PartitionType.LIST);
-        }
-      }
-      if (unaryChild.getSubNode().getType() == curType) {
-        // the second phase
-        unit.setPartitionType(PartitionType.LIST);
-      } else {
-        // the first phase
-        unit.setPartitionType(PartitionType.HASH);
-      }
-    } else if (unaryChild.getSubNode().getType() == ExprType.SCAN) {
-      // the first phase
-      // store - sort - scan
-      unit.setPartitionType(PartitionType.RANGE);
-    } else if (unaryChild.getSubNode().getType() == ExprType.UNION) {
-      _handleUnionNode(rootStore, (UnionNode)unaryChild.getSubNode(), unit,
-          null, PartitionType.LIST);
-    } else {
-      // error
-    }
-    return unit;
-  }
-  
-  private ExecutionBlock makeJoinSubQuery(StoreTableNode rootStore,
-                                    LogicalNode plan, ExecutionBlock unit) throws IOException {
-    UnaryNode unary = (UnaryNode)plan;
-    StoreTableNode outerStore, innerStore;
-    ExecutionBlock prev;
-    JoinNode join = (JoinNode) unary.getSubNode();
-    Schema outerSchema = join.getOuterNode().getOutSchema();
-    Schema innerSchema = join.getInnerNode().getOutSchema();
-    unit.setPartitionType(PartitionType.LIST);
-
-    List<Column> outerCollist = new ArrayList<Column>();
-    List<Column> innerCollist = new ArrayList<Column>();
-    
-    // TODO: set partition for store nodes
-    if (join.hasJoinQual()) {
-      // getting repartition keys
-      List<Column[]> cols = PlannerUtil.getJoinKeyPairs(join.getJoinQual(), outerSchema, innerSchema);
-      for (Column [] pair : cols) {
-        outerCollist.add(pair[0]);
-        innerCollist.add(pair[1]);
-      }
-    } else {
-      // broadcast
-    }
-    
-    Column[] outerCols = new Column[outerCollist.size()];
-    Column[] innerCols = new Column[innerCollist.size()];
-    outerCols = outerCollist.toArray(outerCols);
-    innerCols = innerCollist.toArray(innerCols);
-    
-    // outer
-    if (join.getOuterNode().getType() == ExprType.STORE) {
-      outerStore = (StoreTableNode) join.getOuterNode();
-      TableMeta outerMeta = CatalogUtil.newTableMeta(outerStore.getOutSchema(),
-          StoreType.CSV);
-      insertOuterScan(join, outerStore.getTableName(), outerMeta);
-      prev = convertMap.get(outerStore);
-      if (prev != null) {
-        prev.setPartitionType(PartitionType.HASH);
-        prev.setParentBlock(unit);
-        unit.addChildBlock((ScanNode) join.getOuterNode(), prev);
-      }
-      outerStore.setPartitions(PartitionType.HASH, outerCols, 32);
-    } else if (join.getOuterNode().getType() == ExprType.UNION) {
-      _handleUnionNode(rootStore, (UnionNode)join.getOuterNode(), unit, 
-          outerCols, PartitionType.HASH);
-    } else {
-
-    }
-    
-    // inner
-    if (join.getInnerNode().getType() == ExprType.STORE) {
-      innerStore = (StoreTableNode) join.getInnerNode();
-      TableMeta innerMeta = CatalogUtil.newTableMeta(innerStore.getOutSchema(),
-          StoreType.CSV);
-      insertInnerScan(join, innerStore.getTableName(), innerMeta);
-      prev = convertMap.get(innerStore);
-      if (prev != null) {
-        prev.setPartitionType(PartitionType.HASH);
-        prev.setParentBlock(unit);
-        unit.addChildBlock((ScanNode) join.getInnerNode(), prev);
-      }
-      innerStore.setPartitions(PartitionType.HASH, innerCols, 32);
-    } else if (join.getInnerNode().getType() == ExprType.UNION) {
-      _handleUnionNode(rootStore, (UnionNode)join.getInnerNode(), unit,
-          innerCols, PartitionType.HASH);
-    }
-    
-    return unit;
-  }
-  
-  /**
-   * Recursive하게 union의 자식 plan들을 설정
-   * 
-   * @param rootStore 생성할 SubQuery의 store
-   * @param union union을 root로 하는 logical plan
-   * @param cur 생성할 SubQuery
-   * @param cols partition 정보를 설정하기 위한 column array
-   * @param prevOutputType 자식 SubQuery의 partition type
-   * @throws IOException
-   */
-  private void _handleUnionNode(StoreTableNode rootStore, UnionNode union, 
-      ExecutionBlock cur, Column[] cols, PartitionType prevOutputType)
-          throws IOException {
-    StoreTableNode store;
-    TableMeta meta;
-    ExecutionBlock prev;
-    
-    if (union.getOuterNode().getType() == ExprType.STORE) {
-      store = (StoreTableNode) union.getOuterNode();
-      meta = CatalogUtil.newTableMeta(store.getOutSchema(), StoreType.CSV);
-      insertOuterScan(union, store.getTableName(), meta);
-      prev = convertMap.get(store);
-      if (prev != null) {
-        prev.getStoreTableNode().setTableName(rootStore.getTableName());
-        prev.setPartitionType(prevOutputType);
-        prev.setParentBlock(cur);
-        cur.addChildBlock((ScanNode) union.getOuterNode(), prev);
-      }
-      if (cols != null) {
-        store.setPartitions(PartitionType.LIST, cols, 32);
-      }
-    } else if (union.getOuterNode().getType() == ExprType.UNION) {
-      _handleUnionNode(rootStore, (UnionNode)union.getOuterNode(), cur, cols, 
-          prevOutputType);
-    }
-    
-    if (union.getInnerNode().getType() == ExprType.STORE) {
-      store = (StoreTableNode) union.getInnerNode();
-      meta = CatalogUtil.newTableMeta(store.getOutSchema(), StoreType.CSV);
-      insertInnerScan(union, store.getTableName(), meta);
-      prev = convertMap.get(store);
-      if (prev != null) {
-        prev.getStoreTableNode().setTableName(rootStore.getTableName());
-        prev.setPartitionType(prevOutputType);
-        prev.setParentBlock(cur);
-        cur.addChildBlock((ScanNode) union.getInnerNode(), prev);
-      }
-      if (cols != null) {
-        store.setPartitions(PartitionType.LIST, cols, 32);
-      }
-    } else if (union.getInnerNode().getType() == ExprType.UNION) {
-      _handleUnionNode(rootStore, (UnionNode)union.getInnerNode(), cur, cols, 
-          prevOutputType);
-    }
-  }
-  
-  private LogicalNode insertOuterScan(BinaryNode parent, String tableId,
-      TableMeta meta) throws IOException {
-    TableDesc desc = CatalogUtil.newTableDesc(tableId, meta, sm.getTablePath(tableId));
-    ScanNode scan = new ScanNode(new FromTable(desc));
-    scan.setLocal(true);
-    scan.setInSchema(meta.getSchema());
-    scan.setOutSchema(meta.getSchema());
-    parent.setOuter(scan);
-    return parent;
-  }
-  
-  private LogicalNode insertInnerScan(BinaryNode parent, String tableId, 
-      TableMeta meta) throws IOException {
-    TableDesc desc = CatalogUtil.newTableDesc(tableId, meta, sm.getTablePath(tableId));
-    ScanNode scan = new ScanNode(new FromTable(desc));
-    scan.setLocal(true);
-    scan.setInSchema(meta.getSchema());
-    scan.setOutSchema(meta.getSchema());
-    parent.setInner(scan);
-    return parent;
-  }
-  
-  private MasterPlan convertToGlobalPlan(IndexWriteNode index,
-                                         LogicalNode logicalPlan) throws IOException {
-    recursiveBuildSubQuery(logicalPlan);
-    ExecutionBlock root;
-    
-    if (index != null) {
-      SubQueryId id = QueryIdFactory.newSubQueryId(queryId);
-      ExecutionBlock unit = new ExecutionBlock(id);
-      root = makeScanSubQuery(unit);
-      root.setPlan(index);
-    } else {
-      root = convertMap.get(((LogicalRootNode)logicalPlan).getSubNode());
-      root.getStoreTableNode().setLocal(false);
-    }
-    return new MasterPlan(root);
-  }
-}


Mime
View raw message