tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [12/13] tajo git commit: TAJO-1288: Refactoring org.apache.tajo.master package.
Date Thu, 08 Jan 2015 15:36:20 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
deleted file mode 100644
index c6466f5..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
+++ /dev/null
@@ -1,616 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Stack;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.TaskId;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.codegen.CompilationError;
-import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
-import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
-import org.apache.tajo.engine.planner.global.GlobalPlanner;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.PlanningException;
-import org.apache.tajo.plan.expr.EvalNode;
-import org.apache.tajo.plan.logical.IndexScanNode;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.TUtil;
-import org.apache.tajo.worker.TaskAttemptContext;
-
-import com.google.protobuf.ByteString;
-
-public class NonForwardQueryResultSystemScanner implements NonForwardQueryResultScanner {
-  
-  private final Log LOG = LogFactory.getLog(getClass());
-  
-  private MasterContext masterContext;
-  private LogicalPlan logicalPlan;
-  private final QueryId queryId;
-  private final String sessionId;
-  private TaskAttemptContext taskContext;
-  private int currentRow;
-  private long maxRow;
-  private TableDesc tableDesc;
-  private Schema outSchema;
-  private RowStoreEncoder encoder;
-  private PhysicalExec physicalExec;
-  
-  public NonForwardQueryResultSystemScanner(MasterContext context, LogicalPlan plan, QueryId queryId, 
-      String sessionId, int maxRow) {
-    masterContext = context;
-    logicalPlan = plan;
-    this.queryId = queryId;
-    this.sessionId = sessionId;
-    this.maxRow = maxRow;
-    
-  }
-  
-  @Override
-  public void init() throws IOException {
-    QueryContext queryContext = new QueryContext(masterContext.getConf());
-    currentRow = 0;
-    
-    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, logicalPlan);
-    GlobalPlanner globalPlanner = new GlobalPlanner(masterContext.getConf(), masterContext.getCatalog());
-    try {
-      globalPlanner.build(masterPlan);
-    } catch (PlanningException e) {
-      throw new RuntimeException(e);
-    }
-    
-    ExecutionBlockCursor cursor = new ExecutionBlockCursor(masterPlan);
-    ExecutionBlock leafBlock = null;
-    while (cursor.hasNext()) {
-      ExecutionBlock block = cursor.nextBlock();
-      if (masterPlan.isLeaf(block)) {
-        leafBlock = block;
-        break;
-      }
-    }
-    
-    taskContext = new TaskAttemptContext(queryContext, null,
-        new TaskAttemptId(new TaskId(leafBlock.getId(), 0), 0),
-        null, null);
-    physicalExec = new SimplePhysicalPlannerImpl(masterContext.getConf())
-      .createPlan(taskContext, leafBlock.getPlan());
-    
-    tableDesc = new TableDesc("table_"+System.currentTimeMillis(), physicalExec.getSchema(), 
-        new TableMeta(StoreType.SYSTEM, new KeyValueSet()), null);
-    outSchema = physicalExec.getSchema();
-    encoder = RowStoreUtil.createEncoder(getLogicalSchema());
-    
-    physicalExec.init();
-  }
-
-  @Override
-  public void close() throws Exception {
-    tableDesc = null;
-    outSchema = null;
-    encoder = null;
-    if (physicalExec != null) {
-      try {
-        physicalExec.close();
-      } catch (Exception ignored) {}
-    }
-    physicalExec = null;
-    currentRow = -1;
-  }
-  
-  private List<Tuple> getTablespaces(Schema outSchema) {
-    List<TablespaceProto> tablespaces = masterContext.getCatalog().getAllTablespaces();
-    List<Tuple> tuples = new ArrayList<Tuple>(tablespaces.size());
-    List<Column> columns = outSchema.getColumns();
-    Tuple aTuple;
-    
-    for (TablespaceProto tablespace: tablespaces) {
-      aTuple = new VTuple(outSchema.size());
-      
-      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
-        Column column = columns.get(fieldId);
-        if ("space_id".equalsIgnoreCase(column.getSimpleName())) {
-          if (tablespace.hasId()) {
-            aTuple.put(fieldId, DatumFactory.createInt4(tablespace.getId()));
-          } else {
-            aTuple.put(fieldId, DatumFactory.createNullDatum());
-          }
-        } else if ("space_name".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(tablespace.getSpaceName()));
-        } else if ("space_handler".equalsIgnoreCase(column.getSimpleName())) {
-          if (tablespace.hasHandler()) {
-            aTuple.put(fieldId, DatumFactory.createText(tablespace.getHandler()));
-          } else {
-            aTuple.put(fieldId, DatumFactory.createNullDatum());
-          }
-        } else if ("space_uri".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(tablespace.getUri()));
-        }
-      }
-      tuples.add(aTuple);
-    }
-    
-    return tuples;    
-  }
-  
-  private List<Tuple> getDatabases(Schema outSchema) {
-    List<DatabaseProto> databases = masterContext.getCatalog().getAllDatabases();
-    List<Tuple> tuples = new ArrayList<Tuple>(databases.size());
-    List<Column> columns = outSchema.getColumns();
-    Tuple aTuple;
-    
-    for (DatabaseProto database: databases) {
-      aTuple = new VTuple(outSchema.size());
-      
-      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
-        Column column = columns.get(fieldId);
-        if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(database.getId()));
-        } else if ("db_name".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(database.getName()));
-        } else if ("space_id".equalsIgnoreCase(column.getSimpleName())) {
-          if (database.hasSpaceId()) {
-            aTuple.put(fieldId, DatumFactory.createInt4(database.getSpaceId()));
-          } else {
-            aTuple.put(fieldId, DatumFactory.createNullDatum());
-          }
-        }
-      }
-      
-      tuples.add(aTuple);
-    }
-    
-    return tuples;
-  }
-  
-  private List<Tuple> getTables(Schema outSchema) {
-    List<TableDescriptorProto> tables = masterContext.getCatalog().getAllTables();
-    List<Tuple> tuples = new ArrayList<Tuple>(tables.size());
-    List<Column> columns = outSchema.getColumns();
-    Tuple aTuple;
-    
-    for (TableDescriptorProto table: tables) {
-      aTuple = new VTuple(outSchema.size());
-      
-      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
-        Column column = columns.get(fieldId);
-        if ("tid".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(table.getTid()));
-        } else if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(table.getDbId()));
-        } else if ("table_name".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(table.getName()));
-        } else if ("table_type".equalsIgnoreCase(column.getSimpleName())) {
-          if (table.hasTableType()) {
-            aTuple.put(fieldId, DatumFactory.createText(table.getTableType()));
-          } else {
-            aTuple.put(fieldId, DatumFactory.createNullDatum());
-          }
-        } else if ("path".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(table.getPath()));
-        } else if ("store_type".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(table.getStoreType()));
-        }
-      }
-      
-      tuples.add(aTuple);
-    }
-    
-    return tuples;
-  }
-  
-  private List<Tuple> getColumns(Schema outSchema) {
-    List<ColumnProto> columnsList = masterContext.getCatalog().getAllColumns();
-    List<Tuple> tuples = new ArrayList<Tuple>(columnsList.size());
-    List<Column> columns = outSchema.getColumns();
-    Tuple aTuple;
-    int columnId = 1, prevtid = -1, tid = 0;
-    
-    for (ColumnProto column: columnsList) {
-      aTuple = new VTuple(outSchema.size());
-      
-      tid = column.getTid();
-      if (prevtid != tid) {
-        columnId = 1;
-        prevtid = tid;
-      }
-      
-      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
-        Column colObj = columns.get(fieldId);
-        
-        if ("tid".equalsIgnoreCase(colObj.getSimpleName())) {
-          if (column.hasTid()) {
-            aTuple.put(fieldId, DatumFactory.createInt4(tid));
-          } else {
-            aTuple.put(fieldId, DatumFactory.createNullDatum());
-          }
-        } else if ("column_name".equalsIgnoreCase(colObj.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(column.getName()));
-        } else if ("ordinal_position".equalsIgnoreCase(colObj.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(columnId));
-        } else if ("data_type".equalsIgnoreCase(colObj.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(column.getDataType().getType().toString()));
-        } else if ("type_length".equalsIgnoreCase(colObj.getSimpleName())) {
-          DataType dataType = column.getDataType();
-          if (dataType.hasLength()) {
-            aTuple.put(fieldId, DatumFactory.createInt4(dataType.getLength()));
-          } else {
-            aTuple.put(fieldId, DatumFactory.createNullDatum());
-          }
-        }
-      }
-      
-      columnId++;
-      tuples.add(aTuple);
-    }
-    
-    return tuples;
-  }
-  
-  private List<Tuple> getIndexes(Schema outSchema) {
-    List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes();
-    List<Tuple> tuples = new ArrayList<Tuple>(indexList.size());
-    List<Column> columns = outSchema.getColumns();
-    Tuple aTuple;
-    
-    for (IndexProto index: indexList) {
-      aTuple = new VTuple(outSchema.size());
-      
-      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
-        Column column = columns.get(fieldId);
-        
-        if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId()));
-        } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(index.getTId()));
-        } else if ("index_name".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(index.getIndexName()));
-        } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(index.getColumnName()));
-        } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(index.getDataType()));
-        } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(index.getIndexType()));
-        } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique()));
-        } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered()));
-        } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending()));
-        }
-      }
-      
-      tuples.add(aTuple);
-    }
-    
-    return tuples;
-  }
-  
-  private List<Tuple> getAllTableOptions(Schema outSchema) {
-    List<TableOptionProto> optionList = masterContext.getCatalog().getAllTableOptions();
-    List<Tuple> tuples = new ArrayList<Tuple>(optionList.size());
-    List<Column> columns = outSchema.getColumns();
-    Tuple aTuple;
-    
-    for (TableOptionProto option: optionList) {
-      aTuple = new VTuple(outSchema.size());
-      
-      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
-        Column column = columns.get(fieldId);
-        
-        if ("tid".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(option.getTid()));
-        } else if ("key_".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getKey()));
-        } else if ("value_".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getValue()));
-        }
-      }
-      
-      tuples.add(aTuple);
-    }
-    
-    return tuples;
-  }
-  
-  private List<Tuple> getAllTableStats(Schema outSchema) {
-    List<TableStatsProto> statList = masterContext.getCatalog().getAllTableStats();
-    List<Tuple> tuples = new ArrayList<Tuple>(statList.size());
-    List<Column> columns = outSchema.getColumns();
-    Tuple aTuple;
-    
-    for (TableStatsProto stat: statList) {
-      aTuple = new VTuple(outSchema.size());
-      
-      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
-        Column column = columns.get(fieldId);
-        
-        if ("tid".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(stat.getTid()));
-        } else if ("num_rows".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumRows()));
-        } else if ("num_bytes".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumBytes()));
-        }
-      }
-      
-      tuples.add(aTuple);
-    }
-    
-    return tuples;
-  }
-  
-  private List<Tuple> getAllPartitions(Schema outSchema) {
-    List<TablePartitionProto> partitionList = masterContext.getCatalog().getAllPartitions();
-    List<Tuple> tuples = new ArrayList<Tuple>(partitionList.size());
-    List<Column> columns = outSchema.getColumns();
-    Tuple aTuple;
-    
-    for (TablePartitionProto partition: partitionList) {
-      aTuple = new VTuple(outSchema.size());
-      
-      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
-        Column column = columns.get(fieldId);
-        
-        if ("pid".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(partition.getPid()));
-        } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(partition.getTid()));
-        } else if ("partition_name".equalsIgnoreCase(column.getSimpleName())) {
-          if (partition.hasPartitionName()) {
-            aTuple.put(fieldId, DatumFactory.createText(partition.getPartitionName()));
-          } else {
-            aTuple.put(fieldId, DatumFactory.createNullDatum());
-          }
-        } else if ("ordinal_position".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(partition.getOrdinalPosition()));
-        } else if ("path".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(partition.getPath()));
-        }
-      }
-      
-      tuples.add(aTuple);
-    }
-    
-    return tuples;
-  }
-  
-  private List<Tuple> fetchSystemTable(TableDesc tableDesc, Schema inSchema) {
-    List<Tuple> tuples = null;
-    String tableName = CatalogUtil.extractSimpleName(tableDesc.getName());
-
-    if ("tablespace".equalsIgnoreCase(tableName)) {
-      tuples = getTablespaces(inSchema);
-    } else if ("databases".equalsIgnoreCase(tableName)) {
-      tuples = getDatabases(inSchema);
-    } else if ("tables".equalsIgnoreCase(tableName)) {
-      tuples = getTables(inSchema);
-    } else if ("columns".equalsIgnoreCase(tableName)) {
-      tuples = getColumns(inSchema);
-    } else if ("indexes".equalsIgnoreCase(tableName)) {
-      tuples = getIndexes(inSchema);
-    } else if ("table_options".equalsIgnoreCase(tableName)) {
-      tuples = getAllTableOptions(inSchema);
-    } else if ("table_stats".equalsIgnoreCase(tableName)) {
-      tuples = getAllTableStats(inSchema);
-    } else if ("partitions".equalsIgnoreCase(tableName)) {
-      tuples = getAllPartitions(inSchema);
-    }
-    
-    return tuples;    
-  }
-
-  @Override
-  public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
-    List<ByteString> rows = new ArrayList<ByteString>();
-    int startRow = currentRow;
-    int endRow = startRow + fetchRowNum;
-    
-    if (physicalExec == null) {
-      return rows;
-    }
-    
-    while (currentRow < endRow) {
-      Tuple currentTuple = physicalExec.next();
-      
-      if (currentTuple == null) {
-        physicalExec.close();
-        physicalExec = null;
-        break;
-      }
-      
-      currentRow++;
-      rows.add(ByteString.copyFrom(encoder.toBytes(currentTuple)));
-      
-      if (currentRow >= maxRow) {
-        physicalExec.close();
-        physicalExec = null;
-        break;
-      }
-    }
-    
-    return rows;
-  }
-
-  @Override
-  public QueryId getQueryId() {
-    return queryId;
-  }
-
-  @Override
-  public String getSessionId() {
-    return sessionId;
-  }
-  
-  @Override
-  public TableDesc getTableDesc() {
-    return tableDesc;
-  }
-  
-  @Override
-  public Schema getLogicalSchema() {
-    return outSchema;
-  }
-  
-  class SimplePhysicalPlannerImpl extends PhysicalPlannerImpl {
-
-    public SimplePhysicalPlannerImpl(TajoConf conf) {
-      super(conf);
-    }
-
-    @Override
-    public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node)
-        throws IOException {
-      return new SystemPhysicalExec(ctx, scanNode);
-    }
-
-    @Override
-    public PhysicalExec createIndexScanExec(TaskAttemptContext ctx, IndexScanNode annotation) throws IOException {
-      return new SystemPhysicalExec(ctx, annotation);
-    }
-  }
-  
-  class SystemPhysicalExec extends PhysicalExec {
-    
-    private ScanNode scanNode;
-    private EvalNode qual;
-    private Projector projector;
-    private TableStats tableStats;
-    private final List<Tuple> cachedData;
-    private int currentRow;
-    private boolean isClosed;
-
-    public SystemPhysicalExec(TaskAttemptContext context, ScanNode scanNode) {
-      super(context, scanNode.getInSchema(), scanNode.getOutSchema());
-      this.scanNode = scanNode;
-      this.qual = this.scanNode.getQual();
-      cachedData = TUtil.newList();
-      currentRow = 0;
-      isClosed = false;
-      
-      projector = new Projector(context, inSchema, outSchema, scanNode.getTargets());
-    }
-
-    @Override
-    public Tuple next() throws IOException {
-      Tuple aTuple = null;
-      Tuple outTuple = new VTuple(outColumnNum);
-      
-      if (isClosed) {
-        return null;
-      }
-      
-      if (cachedData.size() == 0) {
-        rescan();
-      }
-      
-      if (!scanNode.hasQual()) {
-        if (currentRow < cachedData.size()) {
-          aTuple = cachedData.get(currentRow++);
-          projector.eval(aTuple, outTuple);
-          outTuple.setOffset(aTuple.getOffset());
-          return outTuple;
-        }
-        return null;
-      } else {
-        while (currentRow < cachedData.size()) {
-          aTuple = cachedData.get(currentRow++);
-          if (qual.eval(inSchema, aTuple).isTrue()) {
-            projector.eval(aTuple, outTuple);
-            return outTuple;
-          }
-        }
-        return null;
-      }
-    }
-
-    @Override
-    public void rescan() throws IOException {
-      cachedData.clear();
-      cachedData.addAll(fetchSystemTable(scanNode.getTableDesc(), inSchema));
-
-      tableStats = new TableStats();
-      tableStats.setNumRows(cachedData.size());
-    }
-
-    @Override
-    public void close() throws IOException {
-      scanNode = null;
-      qual = null;
-      projector = null;
-      cachedData.clear();
-      currentRow = -1;
-      isClosed = true;
-    }
-
-    @Override
-    public float getProgress() {
-      return 1.0f;
-    }
-
-    @Override
-    protected void compile() throws CompilationError {
-      if (scanNode.hasQual()) {
-        qual = context.getPrecompiledEval(inSchema, qual);
-      }
-    }
-
-    @Override
-    public TableStats getInputStats() {
-      return tableStats;
-    }
-    
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java
new file mode 100644
index 0000000..f902081
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.ClientProtos.QueryInfoProto;
+import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.util.history.History;
+
+public class QueryInfo implements GsonObject, History {
+  private QueryId queryId;
+  @Expose
+  private QueryContext context;
+  @Expose
+  private String sql;
+  @Expose
+  private volatile TajoProtos.QueryState queryState;
+  @Expose
+  private volatile float progress;
+  @Expose
+  private volatile long startTime;
+  @Expose
+  private volatile  long finishTime;
+  @Expose
+  private String lastMessage;
+  @Expose
+  private String hostNameOfQM;
+  @Expose
+  private int queryMasterPort;
+  @Expose
+  private int queryMasterClientPort;
+  @Expose
+  private int queryMasterInfoPort;
+  @Expose
+  private String queryIdStr;
+  @Expose
+  private volatile TableDesc resultDesc;
+
+  private String jsonExpr;
+
+  public QueryInfo(QueryId queryId) {
+    this(queryId, null, null, null);
+  }
+
+  public QueryInfo(QueryId queryId, QueryContext queryContext, String sql, String jsonExpr) {
+    this.queryId = queryId;
+    this.queryIdStr = queryId.toString();
+    this.context = queryContext;
+    this.sql = sql;
+    this.jsonExpr = jsonExpr;
+
+    this.queryState = TajoProtos.QueryState.QUERY_MASTER_INIT;
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public QueryContext getQueryContext() {
+    return context;
+  }
+
+  public String getSql() {
+    return sql;
+  }
+
+  public String getQueryMasterHost() {
+    return hostNameOfQM;
+  }
+
+  public void setQueryMaster(String hostName) {
+    this.hostNameOfQM = hostName;
+  }
+
+  public int getQueryMasterInfoPort() {
+    return queryMasterInfoPort;
+  }
+
+  public void setQueryMasterInfoPort(int queryMasterInfoPort) {
+    this.queryMasterInfoPort = queryMasterInfoPort;
+  }
+
+  public void setQueryMasterPort(int port) {
+    this.queryMasterPort = port;
+  }
+
+  public int getQueryMasterPort() {
+    return queryMasterPort;
+  }
+
+  public void setQueryMasterclientPort(int port) {
+    queryMasterClientPort = port;
+  }
+
+  public int getQueryMasterClientPort() {
+    return queryMasterClientPort;
+  }
+
+  public TajoProtos.QueryState getQueryState() {
+    return queryState;
+  }
+
+  public void setQueryState(TajoProtos.QueryState queryState) {
+    this.queryState = queryState;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  public String getLastMessage() {
+    return lastMessage;
+  }
+
+  public void setLastMessage(String lastMessage) {
+    this.lastMessage = lastMessage;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  public void setProgress(float progress) {
+    this.progress = progress;
+  }
+
+  public void setResultDesc(TableDesc result) {
+    this.resultDesc = result;
+  }
+
+  public boolean hasResultdesc() {
+    return resultDesc != null;
+  }
+
+  public TableDesc getResultDesc() {
+    return resultDesc;
+  }
+
+  @Override
+  public String toString() {
+    return queryId.toString() + ",state=" + queryState +",progress=" + progress + ", queryMaster="
+        + getQueryMasterHost();
+  }
+
+  public String getJsonExpr() {
+    return jsonExpr;
+  }
+
+  @Override
+  public String toJson() {
+    return CoreGsonHelper.toJson(this, QueryInfo.class);
+  }
+
+  @Override
+  public HistoryType getHistoryType() {
+    return HistoryType.QUERY_SUMMARY;
+  }
+
+  public static QueryInfo fromJson(String json) {
+    QueryInfo queryInfo = CoreGsonHelper.fromJson(json, QueryInfo.class);
+    queryInfo.queryId = TajoIdUtils.parseQueryId(queryInfo.queryIdStr);
+    return queryInfo;
+  }
+
+  public String getQueryIdStr() {
+    return queryIdStr;
+  }
+
+  public QueryInfoProto getProto() {
+    QueryInfoProto.Builder builder = QueryInfoProto.newBuilder();
+
+    builder.setQueryId(queryId.toString())
+        .setQueryState(queryState)
+        .setContextVars(context.getProto())
+        .setProgress(progress)
+        .setStartTime(startTime)
+        .setFinishTime(finishTime)
+        .setQueryMasterPort(queryMasterPort)
+        .setQueryMasterClientPort(queryMasterClientPort)
+        .setQueryMasterInfoPort(queryMasterInfoPort);
+
+    if (resultDesc != null) {
+      builder.setResultDesc(resultDesc.getProto());
+    }
+
+    if (sql != null) {
+      builder.setSql(sql);
+    }
+
+    if (lastMessage != null) {
+      builder.setLastMessage(lastMessage);
+    }
+
+    if (hostNameOfQM != null) {
+      builder.setHostNameOfQM(hostNameOfQM);
+    }
+
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
new file mode 100644
index 0000000..c9b8711
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.querymaster.QueryJobEvent;
+import org.apache.tajo.session.Session;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class QueryJobManager extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
+
+  // TajoMaster Context
+  private final TajoMaster.MasterContext masterContext;
+
+  private AsyncDispatcher dispatcher;
+
+  private SimpleFifoScheduler scheduler;
+
+  private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
+
+  private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
+
+  private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
+  private AtomicLong maxExecutionTime = new AtomicLong();
+  private AtomicLong avgExecutionTime = new AtomicLong();
+  private AtomicLong executedQuerySize = new AtomicLong();
+
+  public QueryJobManager(final TajoMaster.MasterContext masterContext) {
+    super(QueryJobManager.class.getName());
+    this.masterContext = masterContext;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    try {
+      this.dispatcher = new AsyncDispatcher();
+      addService(this.dispatcher);
+
+      this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
+
+      this.scheduler = new SimpleFifoScheduler(this);
+    } catch (Exception e) {
+      catchException(null, e);
+    }
+
+    super.init(conf);
+  }
+
+  @Override
+  public void stop() {
+    synchronized(runningQueries) {
+      for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
+        eachQueryInProgress.stop();
+      }
+    }
+    this.scheduler.stop();
+    super.stop();
+  }
+
+  @Override
+  public void start() {
+    this.scheduler.start();
+    super.start();
+  }
+
+  public EventHandler getEventHandler() {
+    return dispatcher.getEventHandler();
+  }
+
+  public Collection<QueryInProgress> getSubmittedQueries() {
+    synchronized (submittedQueries){
+      return Collections.unmodifiableCollection(submittedQueries.values());
+    }
+  }
+
+  public Collection<QueryInProgress> getRunningQueries() {
+    synchronized (runningQueries){
+      return Collections.unmodifiableCollection(runningQueries.values());
+    }
+  }
+
+  public synchronized Collection<QueryInfo> getFinishedQueries() {
+    try {
+      return this.masterContext.getHistoryReader().getQueries(null);
+    } catch (Throwable e) {
+      LOG.error(e);
+      return Lists.newArrayList();
+    }
+  }
+
+
+  public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
+    try {
+      return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
+    } catch (Throwable e) {
+      LOG.error(e);
+      return null;
+    }
+  }
+
+  public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql,
+                                     String jsonExpr, LogicalRootNode plan)
+      throws Exception {
+    QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
+    QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql,
+        jsonExpr, plan);
+
+    synchronized (submittedQueries) {
+      queryInProgress.getQueryInfo().setQueryMaster("");
+      submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
+    }
+
+    scheduler.addQuery(queryInProgress);
+    return queryInProgress.getQueryInfo();
+  }
+
+  public QueryInfo startQueryJob(QueryId queryId) throws Exception {
+
+    QueryInProgress queryInProgress;
+
+    synchronized (submittedQueries) {
+      queryInProgress = submittedQueries.remove(queryId);
+    }
+
+    synchronized (runningQueries) {
+      runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
+    }
+
+    addService(queryInProgress);
+    queryInProgress.init(getConfig());
+    queryInProgress.start();
+
+    if (!queryInProgress.startQueryMaster()) {
+      stopQuery(queryId);
+    }
+
+    return queryInProgress.getQueryInfo();
+  }
+
+  public TajoMaster.MasterContext getMasterContext() {
+    return masterContext;
+  }
+
+  class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
+    @Override
+    public void handle(QueryJobEvent event) {
+      QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
+      if(queryInProgress == null) {
+        LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
+        return;
+      }
+
+      if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
+        stopQuery(event.getQueryInfo().getQueryId());
+      } else if (queryInProgress.isStarted()) {
+        queryInProgress.getEventHandler().handle(event);
+      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
+        scheduler.removeQuery(queryInProgress.getQueryId());
+        queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
+
+        stopQuery(queryInProgress.getQueryId());
+      }
+    }
+  }
+
+  public QueryInProgress getQueryInProgress(QueryId queryId) {
+    QueryInProgress queryInProgress;
+    synchronized (submittedQueries) {
+      queryInProgress = submittedQueries.get(queryId);
+    }
+
+    if (queryInProgress == null) {
+      synchronized (runningQueries) {
+        queryInProgress = runningQueries.get(queryId);
+      }
+    }
+    return queryInProgress;
+  }
+
+  public void stopQuery(QueryId queryId) {
+    LOG.info("Stop QueryInProgress:" + queryId);
+    QueryInProgress queryInProgress = getQueryInProgress(queryId);
+    if(queryInProgress != null) {
+      queryInProgress.stop();
+      synchronized(submittedQueries) {
+        submittedQueries.remove(queryId);
+      }
+
+      synchronized(runningQueries) {
+        runningQueries.remove(queryId);
+      }
+
+      QueryInfo queryInfo = queryInProgress.getQueryInfo();
+      long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime();
+      if (executionTime < minExecutionTime.get()) {
+        minExecutionTime.set(executionTime);
+      }
+
+      if (executionTime > maxExecutionTime.get()) {
+        maxExecutionTime.set(executionTime);
+      }
+
+      long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get();
+      if (totalExecutionTime > 0) {
+        avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1));
+      } else {
+        avgExecutionTime.set(executionTime);
+      }
+      executedQuerySize.incrementAndGet();
+      removeService(queryInProgress);
+    } else {
+      LOG.warn("No QueryInProgress while query stopping: " + queryId);
+    }
+  }
+
+  public long getMinExecutionTime() {
+    if (getExecutedQuerySize() == 0) return 0;
+    return minExecutionTime.get();
+  }
+
+  public long getMaxExecutionTime() {
+    return maxExecutionTime.get();
+  }
+
+  public long getAvgExecutionTime() {
+    return avgExecutionTime.get();
+  }
+
+  public long getExecutedQuerySize() {
+    return executedQuerySize.get();
+  }
+
+  private void catchException(QueryId queryId, Exception e) {
+    LOG.error(e.getMessage(), e);
+    QueryInProgress queryInProgress = runningQueries.get(queryId);
+    queryInProgress.catchException(e);
+  }
+
+  public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
+      TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
+    QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
+    if(queryInProgress == null) {
+      return null;
+    }
+
+    QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
+    getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
+
+    return null;
+  }
+
+  private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
+    QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
+    WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
+
+    queryInfo.setQueryMaster(connectionInfo.getHost());
+    queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
+    queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
+    queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
+    queryInfo.setQueryState(queryHeartbeat.getState());
+    queryInfo.setProgress(queryHeartbeat.getQueryProgress());
+
+    if (queryHeartbeat.hasQueryFinishTime()) {
+      queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
+    }
+
+    if (queryHeartbeat.hasResultDesc()) {
+      queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
+    }
+
+    return queryInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java b/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java
deleted file mode 100644
index b05572b..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/ScheduledFetches.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.tajo.worker.FetchImpl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class ScheduledFetches {
-  private List<Map<String, List<FetchImpl>>> fetches = new ArrayList<Map<String, List<FetchImpl>>>();
-
-  public void addFetch(Map<String, List<FetchImpl>> fetch) {
-    this.fetches.add(fetch);
-  }
-
-  public boolean hasNextFetch() {
-    return fetches.size() > 0;
-  }
-
-  public Map<String, List<FetchImpl>> getNextFetch() {
-    return hasNextFetch() ? fetches.get(0) : null;
-  }
-
-  public Map<String, List<FetchImpl>> popNextFetch() {
-    return hasNextFetch() ? fetches.remove(0) : null;
-  }
-
-  public int size() {
-    return fetches.size();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 4649d99..7209080 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -28,7 +28,7 @@ import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.master.container.TajoContainer;
 import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.rm.TajoWorkerContainer;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index d021e43..c054599 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -40,14 +40,13 @@ import org.apache.tajo.catalog.LocalCatalogWrapper;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.function.FunctionLoader;
-import org.apache.tajo.master.ha.HAService;
-import org.apache.tajo.master.ha.HAServiceHDFSImpl;
-import org.apache.tajo.master.metrics.CatalogMetricsGaugeSet;
-import org.apache.tajo.master.metrics.WorkerResourceMetricsGaugeSet;
-import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.ha.HAService;
+import org.apache.tajo.ha.HAServiceHDFSImpl;
+import org.apache.tajo.metrics.CatalogMetricsGaugeSet;
+import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.master.session.SessionManager;
+import org.apache.tajo.session.SessionManager;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rule.EvaluationContext;
 import org.apache.tajo.rule.EvaluationFailedException;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 249d335..93326be 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -44,15 +44,14 @@ import org.apache.tajo.ipc.ClientProtos.*;
 import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
 import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.querymaster.QueryInProgress;
-import org.apache.tajo.master.querymaster.QueryInfo;
-import org.apache.tajo.master.querymaster.QueryJobEvent;
-import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
+import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.querymaster.QueryJobEvent;
 import org.apache.tajo.master.rm.Worker;
 import org.apache.tajo.master.rm.WorkerResource;
-import org.apache.tajo.master.session.InvalidSessionException;
-import org.apache.tajo.master.session.NoSuchSessionVariableException;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.session.InvalidSessionException;
+import org.apache.tajo.session.NoSuchSessionVariableException;
+import org.apache.tajo.session.Session;
 import org.apache.tajo.rpc.BlockingRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
index 1e3501c..a7df206 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -23,14 +23,12 @@ import com.google.protobuf.RpcController;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.querymaster.QueryJobManager;
 import org.apache.tajo.master.rm.Worker;
 import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.rpc.AsyncRpcServer;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java
deleted file mode 100644
index 755df5a..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerContext.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-
-public class TaskSchedulerContext {
-  private QueryMasterTask.QueryMasterTaskContext masterContext;
-  private boolean isLeafQuery;
-  private ExecutionBlockId blockId;
-  private int taskSize;
-  private int estimatedTaskNum;
-
-  public TaskSchedulerContext(QueryMasterTask.QueryMasterTaskContext masterContext, boolean isLeafQuery,
-                              ExecutionBlockId blockId) {
-    this.masterContext = masterContext;
-    this.isLeafQuery = isLeafQuery;
-    this.blockId = blockId;
-  }
-
-  public QueryMasterTask.QueryMasterTaskContext getMasterContext() {
-    return masterContext;
-  }
-
-  public boolean isLeafQuery() {
-    return isLeafQuery;
-  }
-
-  public ExecutionBlockId getBlockId() {
-    return blockId;
-  }
-
-  public int getTaskSize() {
-    return taskSize;
-  }
-
-  public int getEstimatedTaskNum() {
-    return estimatedTaskNum;
-  }
-
-  public void setTaskSize(int taskSize) {
-    this.taskSize = taskSize;
-  }
-
-  public void setEstimatedTaskNum(int estimatedTaskNum) {
-    this.estimatedTaskNum = estimatedTaskNum;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
deleted file mode 100644
index e5291e9..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.master.querymaster.Stage;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-public class TaskSchedulerFactory {
-  private static Class<? extends AbstractTaskScheduler> CACHED_ALGORITHM_CLASS;
-  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
-  private static final Class<?>[] DEFAULT_PARAMS = { TaskSchedulerContext.class, Stage.class };
-
-  public static Class<? extends AbstractTaskScheduler> getTaskSchedulerClass(Configuration conf)
-      throws IOException {
-    if (CACHED_ALGORITHM_CLASS != null) {
-      return CACHED_ALGORITHM_CLASS;
-    } else {
-      CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.task-scheduler", null, AbstractTaskScheduler.class);
-    }
-
-    if (CACHED_ALGORITHM_CLASS == null) {
-      throw new IOException("Task scheduler is null");
-    }
-    return CACHED_ALGORITHM_CLASS;
-  }
-
-  public static <T extends AbstractTaskScheduler> T get(Class<T> clazz, TaskSchedulerContext context,
-                                                        Stage stage) {
-    T result;
-    try {
-      Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
-      if (constructor == null) {
-        constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS);
-        constructor.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(clazz, constructor);
-      }
-      result = constructor.newInstance(new Object[]{context, stage});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    return result;
-  }
-
-  public static AbstractTaskScheduler get(Configuration conf, TaskSchedulerContext context, Stage stage)
-      throws IOException {
-    return get(getTaskSchedulerClass(conf), context, stage);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
index e5a9a32..fcae53c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.master.querymaster.StageState;
+import org.apache.tajo.querymaster.StageState;
 
 public class QueryCompletedEvent extends QueryEvent {
   private final ExecutionBlockId executionBlockId;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
index 623576a..3a387fa 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
@@ -21,7 +21,7 @@ package org.apache.tajo.master.event;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.session.Session;
 
 /**
  * This event is conveyed to QueryMaster.

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java
index 2d16fbe..f012e92 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.master.querymaster.StageState;
+import org.apache.tajo.querymaster.StageState;
 
 public class StageCompletedEvent extends QueryEvent {
   private final ExecutionBlockId executionBlockId;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java
index 91ef942..5a016fb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptToSchedulerEvent.java
@@ -21,7 +21,7 @@ package org.apache.tajo.master.event;
 import com.google.protobuf.RpcCallback;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.querymaster.TaskAttempt;
+import org.apache.tajo.querymaster.TaskAttempt;
 import org.apache.tajo.master.container.TajoContainerId;
 
 public class TaskAttemptToSchedulerEvent extends TaskSchedulerEvent {

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
index acbaa01..2030b55 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -34,7 +34,6 @@ import org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.session.Session;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
new file mode 100644
index 0000000..dc0c44a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.engine.planner.physical.SeqScanExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class NonForwardQueryResultFileScanner implements NonForwardQueryResultScanner {
+  private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100;
+  
+  private QueryId queryId;
+  private String sessionId;
+  private SeqScanExec scanExec;
+  private TableDesc tableDesc;
+  private RowStoreEncoder rowEncoder;
+  private int maxRow;
+  private int currentNumRows;
+  private TaskAttemptContext taskContext;
+  private TajoConf tajoConf;
+  private ScanNode scanNode;
+  
+  private int currentFragmentIndex = 0;
+
+  public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode,
+      TableDesc tableDesc, int maxRow) throws IOException {
+    this.tajoConf = tajoConf;
+    this.sessionId = sessionId;
+    this.queryId = queryId;
+    this.scanNode = scanNode;
+    this.tableDesc = tableDesc;
+    this.maxRow = maxRow;
+    this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema());
+  }
+
+  public void init() throws IOException {
+    initSeqScanExec();
+  }
+
+  private void initSeqScanExec() throws IOException {
+    List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType())
+        .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);
+    
+    if (fragments != null && !fragments.isEmpty()) {
+      FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[] {}));
+      this.taskContext = new TaskAttemptContext(
+          new QueryContext(tajoConf), null, 
+          new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), 
+          fragmentProtos, null);
+      try {
+        // scanNode must be clone cause SeqScanExec change target in the case of
+        // a partitioned table.
+        scanExec = new SeqScanExec(taskContext, (ScanNode) scanNode.clone(), fragmentProtos);
+      } catch (CloneNotSupportedException e) {
+        throw new IOException(e.getMessage(), e);
+      }
+      scanExec.init();
+      currentFragmentIndex += fragments.size();
+    }
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public void setScanExec(SeqScanExec scanExec) {
+    this.scanExec = scanExec;
+  }
+
+  public TableDesc getTableDesc() {
+    return tableDesc;
+  }
+
+  public void close() throws Exception {
+    if (scanExec != null) {
+      scanExec.close();
+      scanExec = null;
+    }
+  }
+
+  public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
+    List<ByteString> rows = new ArrayList<ByteString>();
+    if (scanExec == null) {
+      return rows;
+    }
+    int rowCount = 0;
+    while (true) {
+      Tuple tuple = scanExec.next();
+      if (tuple == null) {
+        scanExec.close();
+        scanExec = null;
+        initSeqScanExec();
+        if (scanExec != null) {
+          tuple = scanExec.next();
+        }
+        if (tuple == null) {
+          if (scanExec != null) {
+            scanExec.close();
+            scanExec = null;
+          }
+          break;
+        }
+      }
+      rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple))));
+      rowCount++;
+      currentNumRows++;
+      if (rowCount >= fetchRowNum) {
+        break;
+      }
+      if (currentNumRows >= maxRow) {
+        scanExec.close();
+        scanExec = null;
+        break;
+      }
+    }
+    return rows;
+  }
+
+  @Override
+  public Schema getLogicalSchema() {
+    return tableDesc.getLogicalSchema();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
new file mode 100644
index 0000000..86d2843
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+
+import com.google.protobuf.ByteString;
+
+public interface NonForwardQueryResultScanner {
+
+  public void close() throws Exception;
+
+  public Schema getLogicalSchema();
+
+  public List<ByteString> getNextRows(int fetchRowNum) throws IOException;
+
+  public QueryId getQueryId();
+  
+  public String getSessionId();
+  
+  public TableDesc getTableDesc();
+
+  public void init() throws IOException;
+
+}


Mime
View raw message