tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [5/8] tajo git commit: TAJO-1176: Implements queryable virtual tables for catalog information
Date Wed, 31 Dec 2014 10:51:26 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
index 9575c13..51f65ee 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
@@ -22,6 +22,7 @@
 package org.apache.tajo.catalog.store;
 
 import com.google.common.collect.Maps;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoConstants;
@@ -29,7 +30,17 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.catalog.exception.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto;
+import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
 import java.util.*;
@@ -84,6 +95,22 @@ public class MemStore implements CatalogStore {
   public Collection<String> getAllTablespaceNames() throws CatalogException {
     return tablespaces.keySet();
   }
+  
+  @Override
+  public List<TablespaceProto> getTablespaces() throws CatalogException {
+    List<TablespaceProto> tablespaceList = TUtil.newList();
+    int tablespaceId = 0;
+    
+    for (String spaceName: tablespaces.keySet()) {
+      TablespaceProto.Builder builder = TablespaceProto.newBuilder();
+      builder.setSpaceName(spaceName);
+      builder.setUri(tablespaces.get(spaceName));
+      builder.setId(tablespaceId++);
+      tablespaceList.add(builder.build());
+    }
+    
+    return tablespaceList;
+  }
 
   @Override
   public TablespaceProto getTablespace(String spaceName) throws CatalogException {
@@ -139,6 +166,24 @@ public class MemStore implements CatalogStore {
   public Collection<String> getAllDatabaseNames() throws CatalogException {
     return databases.keySet();
   }
+  
+  @Override
+  public List<DatabaseProto> getAllDatabases() throws CatalogException {
+    List<DatabaseProto> databaseList = new ArrayList<DatabaseProto>();
+    int dbId = 0;
+    
+    for (String databaseName: databases.keySet()) {
+      DatabaseProto.Builder builder = DatabaseProto.newBuilder();
+      
+      builder.setId(dbId++);
+      builder.setName(databaseName);
+      builder.setSpaceId(0);
+      
+      databaseList.add(builder.build());
+    }
+    
+    return databaseList;
+  }
 
   /**
    * Get a database namespace from a Map instance.
@@ -303,6 +348,118 @@ public class MemStore implements CatalogStore {
     Map<String, CatalogProtos.TableDescProto> database = checkAndGetDatabaseNS(databases, databaseName);
     return new ArrayList<String>(database.keySet());
   }
+  
+  @Override
+  public List<TableDescriptorProto> getAllTables() throws CatalogException {
+    List<TableDescriptorProto> tableList = new ArrayList<CatalogProtos.TableDescriptorProto>();
+    int dbId = 0, tableId = 0;
+    
+    for (String databaseName: databases.keySet()) {
+      Map<String, TableDescProto> tables = databases.get(databaseName);
+      List<String> tableNameList = TUtil.newList(tables.keySet());
+      Collections.sort(tableNameList);
+      
+      for (String tableName: tableNameList) {
+        TableDescProto tableDesc = tables.get(tableName);
+        TableDescriptorProto.Builder builder = TableDescriptorProto.newBuilder();
+        
+        builder.setDbId(dbId);
+        builder.setTid(tableId);
+        builder.setName(tableName);
+        builder.setPath(tableDesc.getPath());
+        builder.setTableType(tableDesc.getIsExternal()?"EXTERNAL":"BASE");
+        builder.setStoreType(tableDesc.getMeta().getStoreType().toString());
+        
+        tableList.add(builder.build());
+        tableId++;
+      }
+      dbId++;
+    }
+    
+    return tableList;
+  }
+  
+  @Override
+  public List<TableOptionProto> getAllTableOptions() throws CatalogException {
+    List<TableOptionProto> optionList = new ArrayList<CatalogProtos.TableOptionProto>();
+    int tid = 0;
+    
+    for (String databaseName: databases.keySet()) {
+      Map<String, TableDescProto> tables = databases.get(databaseName);
+      List<String> tableNameList = TUtil.newList(tables.keySet());
+      Collections.sort(tableNameList);
+      
+      for (String tableName: tableNameList) {
+        TableDescProto table = tables.get(tableName); 
+        List<KeyValueProto> keyValueList = table.getMeta().getParams().getKeyvalList();
+        
+        for (KeyValueProto keyValue: keyValueList) {
+          TableOptionProto.Builder builder = TableOptionProto.newBuilder();
+          
+          builder.setTid(tid);
+          builder.setKeyval(keyValue);
+          
+          optionList.add(builder.build());
+        }
+      }
+      tid++;
+    }
+    
+    return optionList;
+  }
+  
+  @Override
+  public List<TableStatsProto> getAllTableStats() throws CatalogException {
+    List<TableStatsProto> statList = new ArrayList<CatalogProtos.TableStatsProto>();
+    int tid = 0;
+    
+    for (String databaseName: databases.keySet()) {
+      Map<String, TableDescProto> tables = databases.get(databaseName);
+      List<String> tableNameList = TUtil.newList(tables.keySet());
+      Collections.sort(tableNameList);
+      
+      for (String tableName: tableNameList) {
+        TableDescProto table = tables.get(tableName);
+        TableStatsProto.Builder builder = TableStatsProto.newBuilder();
+        
+        builder.setTid(tid);
+        builder.setNumRows(table.getStats().getNumRows());
+        builder.setNumBytes(table.getStats().getNumBytes());
+        
+        statList.add(builder.build());
+      }
+      tid++;
+    }
+    
+    return statList;
+  }
+  
+  @Override
+  public List<ColumnProto> getAllColumns() throws CatalogException {
+    List<ColumnProto> columnList = new ArrayList<CatalogProtos.ColumnProto>();
+    int tid = 0;
+    
+    for (String databaseName: databases.keySet()) {
+      Map<String, TableDescProto> tables = databases.get(databaseName);
+      List<String> tableNameList = TUtil.newList(tables.keySet());
+      Collections.sort(tableNameList);
+      
+      for (String tableName: tableNameList) {
+        TableDescProto tableDesc = tables.get(tableName);
+        
+        for (ColumnProto column: tableDesc.getSchema().getFieldsList()) {
+          ColumnProto.Builder builder = ColumnProto.newBuilder();
+          builder.setTid(tid);
+          builder.setName(column.getName());
+          builder.setDataType(column.getDataType());
+          columnList.add(builder.build());
+        }
+      }
+      tid++;
+    }
+    
+    return columnList;
+  }
 
   @Override
   public void addPartitionMethod(CatalogProtos.PartitionMethodProto partitionMethodProto) throws CatalogException {
@@ -370,6 +527,11 @@ public class MemStore implements CatalogStore {
   public void dropPartitions(String tableName) throws CatalogException {
     throw new RuntimeException("not supported!");
   }
+  
+  @Override
+  public List<TablePartitionProto> getAllPartitions() throws CatalogException {
+    throw new UnsupportedOperationException();
+  }
 
   /* (non-Javadoc)
    * @see CatalogStore#createIndex(nta.catalog.proto.CatalogProtos.IndexDescProto)
@@ -455,6 +617,33 @@ public class MemStore implements CatalogStore {
 
     return protos.toArray(new IndexDescProto[protos.size()]);
   }
+  
+  @Override
+  public List<IndexProto> getAllIndexes() throws CatalogException {
+    List<IndexProto> indexList = new ArrayList<CatalogProtos.IndexProto>();
+    Set<String> databases = indexes.keySet();
+    
+    for (String databaseName: databases) {
+      Map<String, IndexDescProto> indexMap = indexes.get(databaseName);
+      
+      for (String indexName: indexMap.keySet()) {
+        IndexDescProto indexDesc = indexMap.get(indexName);
+        IndexProto.Builder builder = IndexProto.newBuilder();
+        
+        builder.setColumnName(indexDesc.getColumn().getName());
+        builder.setDataType(indexDesc.getColumn().getDataType().getType().toString());
+        builder.setIndexName(indexName);
+        builder.setIndexType(indexDesc.getIndexMethod().toString());
+        builder.setIsAscending(indexDesc.hasIsAscending() && indexDesc.getIsAscending());
+        builder.setIsClustered(indexDesc.hasIsClustered() && indexDesc.getIsClustered());
+        builder.setIsUnique(indexDesc.hasIsUnique() && indexDesc.getIsUnique());
+        
+        indexList.add(builder.build());
+      }
+    }
+    
+    return indexList;
+  }
 
   @Override
   public void addFunction(FunctionDesc func) throws CatalogException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index a044d64..43c6f7d 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -19,8 +19,10 @@
 package org.apache.tajo.catalog;
 
 import com.google.common.collect.Sets;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary;
 import org.apache.tajo.catalog.exception.CatalogException;
 import org.apache.tajo.catalog.exception.NoSuchFunctionException;
 import org.apache.tajo.catalog.store.PostgreSQLStore;
@@ -53,7 +55,6 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
 import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceType;
 import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.SetLocation;
 import static org.junit.Assert.*;
-import static org.junit.Assert.assertEquals;
 
 public class TestCatalog {
 	static final String FieldName1="f1";
@@ -211,6 +212,7 @@ public class TestCatalog {
   @Test
   public void testCreateAndDropManyDatabases() throws Exception {
     List<String> createdDatabases = new ArrayList<String>();
+    InfoSchemaMetadataDictionary dictionary = new InfoSchemaMetadataDictionary();
     String namePrefix = "database_";
     final int NUM = 10;
     for (int i = 0; i < NUM; i++) {
@@ -223,10 +225,11 @@ public class TestCatalog {
 
     Collection<String> allDatabaseNames = catalog.getAllDatabaseNames();
     for (String databaseName : allDatabaseNames) {
-      assertTrue(databaseName.equals(DEFAULT_DATABASE_NAME) || createdDatabases.contains(databaseName));
+      assertTrue(databaseName.equals(DEFAULT_DATABASE_NAME) || createdDatabases.contains(databaseName) ||
+          dictionary.isSystemDatabase(databaseName));
     }
-    // additional one is 'default' database.
-    assertEquals(NUM + 1, allDatabaseNames.size());
+    // additional ones are 'default' and 'system' databases.
+    assertEquals(NUM + 2, allDatabaseNames.size());
 
     Collections.shuffle(createdDatabases);
     for (String tobeDropped : createdDatabases) {
@@ -351,8 +354,8 @@ public class TestCatalog {
       }
     }
 
-    // Finally, only default database will remain. So, its result is 1.
-    assertEquals(1, catalog.getAllDatabaseNames().size());
+    // Finally, default and system database will remain. So, its result is 1.
+    assertEquals(2, catalog.getAllDatabaseNames().size());
   }
 	
 	@Test

http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java
new file mode 100644
index 0000000..d6ea459
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultFileScanner.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.engine.planner.physical.SeqScanExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class NonForwardQueryResultFileScanner implements NonForwardQueryResultScanner {
+  private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100;
+  
+  private QueryId queryId;
+  private String sessionId;
+  private SeqScanExec scanExec;
+  private TableDesc tableDesc;
+  private RowStoreEncoder rowEncoder;
+  private int maxRow;
+  private int currentNumRows;
+  private TaskAttemptContext taskContext;
+  private TajoConf tajoConf;
+  private ScanNode scanNode;
+  
+  private int currentFragmentIndex = 0;
+
+  public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode,
+      TableDesc tableDesc, int maxRow) throws IOException {
+    this.tajoConf = tajoConf;
+    this.sessionId = sessionId;
+    this.queryId = queryId;
+    this.scanNode = scanNode;
+    this.tableDesc = tableDesc;
+    this.maxRow = maxRow;
+    this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema());
+  }
+
+  public void init() throws IOException {
+    initSeqScanExec();
+  }
+
+  private void initSeqScanExec() throws IOException {
+    List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType())
+        .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);
+    
+    if (fragments != null && !fragments.isEmpty()) {
+      FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[] {}));
+      this.taskContext = new TaskAttemptContext(
+          new QueryContext(tajoConf), null, 
+          new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), 
+          fragmentProtos, null);
+      try {
+        // scanNode must be clone cause SeqScanExec change target in the case of
+        // a partitioned table.
+        scanExec = new SeqScanExec(taskContext, (ScanNode) scanNode.clone(), fragmentProtos);
+      } catch (CloneNotSupportedException e) {
+        throw new IOException(e.getMessage(), e);
+      }
+      scanExec.init();
+      currentFragmentIndex += fragments.size();
+    }
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public void setScanExec(SeqScanExec scanExec) {
+    this.scanExec = scanExec;
+  }
+
+  public TableDesc getTableDesc() {
+    return tableDesc;
+  }
+
+  public void close() throws Exception {
+    if (scanExec != null) {
+      scanExec.close();
+      scanExec = null;
+    }
+  }
+
+  public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
+    List<ByteString> rows = new ArrayList<ByteString>();
+    if (scanExec == null) {
+      return rows;
+    }
+    int rowCount = 0;
+    while (true) {
+      Tuple tuple = scanExec.next();
+      if (tuple == null) {
+        scanExec.close();
+        scanExec = null;
+        initSeqScanExec();
+        if (scanExec != null) {
+          tuple = scanExec.next();
+        }
+        if (tuple == null) {
+          if (scanExec != null) {
+            scanExec.close();
+            scanExec = null;
+          }
+          break;
+        }
+      }
+      rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple))));
+      rowCount++;
+      currentNumRows++;
+      if (rowCount >= fetchRowNum) {
+        break;
+      }
+      if (currentNumRows >= maxRow) {
+        scanExec.close();
+        scanExec = null;
+        break;
+      }
+    }
+    return rows;
+  }
+
+  @Override
+  public Schema getLogicalSchema() {
+    return tableDesc.getLogicalSchema();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
index aced80c..7e7d705 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
@@ -18,149 +18,29 @@
 
 package org.apache.tajo.master;
 
-import com.google.protobuf.ByteString;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.TaskId;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.engine.planner.physical.SeqScanExec;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.worker.TaskAttemptContext;
-
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
-public class NonForwardQueryResultScanner {
-  private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100;
-
-  private QueryId queryId;
-  private String sessionId;
-  private SeqScanExec scanExec;
-  private TableDesc tableDesc;
-  private RowStoreEncoder rowEncoder;
-  private int maxRow;
-  private int currentNumRows;
-  private TaskAttemptContext taskContext;
-  private TajoConf tajoConf;
-  private ScanNode scanNode;
-
-  private int currentFragmentIndex = 0;
-
-  public NonForwardQueryResultScanner(TajoConf tajoConf, String sessionId,
-                                      QueryId queryId,
-                                      ScanNode scanNode,
-                                      TableDesc tableDesc,
-                                      int maxRow) throws IOException {
-    this.tajoConf = tajoConf;
-    this.sessionId = sessionId;
-    this.queryId = queryId;
-    this.scanNode = scanNode;
-    this.tableDesc = tableDesc;
-    this.maxRow = maxRow;
-
-    this.rowEncoder =  RowStoreUtil.createEncoder(tableDesc.getLogicalSchema());
-  }
-
-  public void init() throws IOException {
-    initSeqScanExec();
-  }
-
-  private void initSeqScanExec() throws IOException {
-    List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType())
-        .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);
-
-    if (fragments != null && !fragments.isEmpty()) {
-      FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[]{}));
-      this.taskContext = new TaskAttemptContext(
-          new QueryContext(tajoConf), null,
-          new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0),
-          fragmentProtos, null);
-
-      try {
-        // scanNode must be clone cause SeqScanExec change target in the case of a partitioned table.
-        scanExec = new SeqScanExec(taskContext, (ScanNode)scanNode.clone(), fragmentProtos);
-      } catch (CloneNotSupportedException e) {
-        throw new IOException(e.getMessage(), e);
-      }
-      scanExec.init();
-      currentFragmentIndex += fragments.size();
-    }
-  }
-
-  public QueryId getQueryId() {
-    return queryId;
-  }
-
-  public String getSessionId() {
-    return sessionId;
-  }
-
-  public void setScanExec(SeqScanExec scanExec) {
-    this.scanExec = scanExec;
-  }
+import org.apache.tajo.QueryId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
 
-  public TableDesc getTableDesc() {
-    return tableDesc;
-  }
+import com.google.protobuf.ByteString;
 
-  public void close() throws Exception {
-    if (scanExec != null) {
-      scanExec.close();
-      scanExec = null;
-    }
-  }
+public interface NonForwardQueryResultScanner {
 
-  public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
-    List<ByteString> rows = new ArrayList<ByteString>();
-    if (scanExec == null) {
-      return rows;
-    }
-    int rowCount = 0;
+  public void close() throws Exception;
 
-    while (true) {
-      Tuple tuple = scanExec.next();
-      if (tuple == null) {
-        scanExec.close();
-        scanExec = null;
+  public Schema getLogicalSchema();
 
-        initSeqScanExec();
-        if (scanExec != null) {
-          tuple = scanExec.next();
-        }
-        if (tuple == null) {
-          if (scanExec != null ) {
-            scanExec.close();
-            scanExec = null;
-          }
+  public List<ByteString> getNextRows(int fetchRowNum) throws IOException;
 
-          break;
-        }
-      }
-      rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple))));
-      rowCount++;
-      currentNumRows++;
-      if (rowCount >= fetchRowNum) {
-        break;
-      }
+  public QueryId getQueryId();
+  
+  public String getSessionId();
+  
+  public TableDesc getTableDesc();
 
-      if (currentNumRows >= maxRow) {
-        scanExec.close();
-        scanExec = null;
-        break;
-      }
-    }
+  public void init() throws IOException;
 
-    return rows;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
new file mode 100644
index 0000000..c6466f5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
@@ -0,0 +1,616 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.codegen.CompilationError;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.IndexScanNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import com.google.protobuf.ByteString;
+
+public class NonForwardQueryResultSystemScanner implements NonForwardQueryResultScanner {
+  
+  private final Log LOG = LogFactory.getLog(getClass());
+  
+  private MasterContext masterContext;
+  private LogicalPlan logicalPlan;
+  private final QueryId queryId;
+  private final String sessionId;
+  private TaskAttemptContext taskContext;
+  private int currentRow;
+  private long maxRow;
+  private TableDesc tableDesc;
+  private Schema outSchema;
+  private RowStoreEncoder encoder;
+  private PhysicalExec physicalExec;
+  
+  public NonForwardQueryResultSystemScanner(MasterContext context, LogicalPlan plan, QueryId queryId, 
+      String sessionId, int maxRow) {
+    masterContext = context;
+    logicalPlan = plan;
+    this.queryId = queryId;
+    this.sessionId = sessionId;
+    this.maxRow = maxRow;
+    
+  }
+  
+  @Override
+  public void init() throws IOException {
+    QueryContext queryContext = new QueryContext(masterContext.getConf());
+    currentRow = 0;
+    
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, logicalPlan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(masterContext.getConf(), masterContext.getCatalog());
+    try {
+      globalPlanner.build(masterPlan);
+    } catch (PlanningException e) {
+      throw new RuntimeException(e);
+    }
+    
+    ExecutionBlockCursor cursor = new ExecutionBlockCursor(masterPlan);
+    ExecutionBlock leafBlock = null;
+    while (cursor.hasNext()) {
+      ExecutionBlock block = cursor.nextBlock();
+      if (masterPlan.isLeaf(block)) {
+        leafBlock = block;
+        break;
+      }
+    }
+    
+    taskContext = new TaskAttemptContext(queryContext, null,
+        new TaskAttemptId(new TaskId(leafBlock.getId(), 0), 0),
+        null, null);
+    physicalExec = new SimplePhysicalPlannerImpl(masterContext.getConf())
+      .createPlan(taskContext, leafBlock.getPlan());
+    
+    tableDesc = new TableDesc("table_"+System.currentTimeMillis(), physicalExec.getSchema(), 
+        new TableMeta(StoreType.SYSTEM, new KeyValueSet()), null);
+    outSchema = physicalExec.getSchema();
+    encoder = RowStoreUtil.createEncoder(getLogicalSchema());
+    
+    physicalExec.init();
+  }
+
+  @Override
+  public void close() throws Exception {
+    tableDesc = null;
+    outSchema = null;
+    encoder = null;
+    if (physicalExec != null) {
+      try {
+        physicalExec.close();
+      } catch (Exception ignored) {}
+    }
+    physicalExec = null;
+    currentRow = -1;
+  }
+  
+  private List<Tuple> getTablespaces(Schema outSchema) {
+    List<TablespaceProto> tablespaces = masterContext.getCatalog().getAllTablespaces();
+    List<Tuple> tuples = new ArrayList<Tuple>(tablespaces.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    
+    for (TablespaceProto tablespace: tablespaces) {
+      aTuple = new VTuple(outSchema.size());
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column column = columns.get(fieldId);
+        if ("space_id".equalsIgnoreCase(column.getSimpleName())) {
+          if (tablespace.hasId()) {
+            aTuple.put(fieldId, DatumFactory.createInt4(tablespace.getId()));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        } else if ("space_name".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(tablespace.getSpaceName()));
+        } else if ("space_handler".equalsIgnoreCase(column.getSimpleName())) {
+          if (tablespace.hasHandler()) {
+            aTuple.put(fieldId, DatumFactory.createText(tablespace.getHandler()));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        } else if ("space_uri".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(tablespace.getUri()));
+        }
+      }
+      tuples.add(aTuple);
+    }
+    
+    return tuples;    
+  }
+  
+  private List<Tuple> getDatabases(Schema outSchema) {
+    List<DatabaseProto> databases = masterContext.getCatalog().getAllDatabases();
+    List<Tuple> tuples = new ArrayList<Tuple>(databases.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    
+    for (DatabaseProto database: databases) {
+      aTuple = new VTuple(outSchema.size());
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column column = columns.get(fieldId);
+        if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(database.getId()));
+        } else if ("db_name".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(database.getName()));
+        } else if ("space_id".equalsIgnoreCase(column.getSimpleName())) {
+          if (database.hasSpaceId()) {
+            aTuple.put(fieldId, DatumFactory.createInt4(database.getSpaceId()));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        }
+      }
+      
+      tuples.add(aTuple);
+    }
+    
+    return tuples;
+  }
+  
+  private List<Tuple> getTables(Schema outSchema) {
+    List<TableDescriptorProto> tables = masterContext.getCatalog().getAllTables();
+    List<Tuple> tuples = new ArrayList<Tuple>(tables.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    
+    for (TableDescriptorProto table: tables) {
+      aTuple = new VTuple(outSchema.size());
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column column = columns.get(fieldId);
+        if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(table.getTid()));
+        } else if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(table.getDbId()));
+        } else if ("table_name".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(table.getName()));
+        } else if ("table_type".equalsIgnoreCase(column.getSimpleName())) {
+          if (table.hasTableType()) {
+            aTuple.put(fieldId, DatumFactory.createText(table.getTableType()));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        } else if ("path".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(table.getPath()));
+        } else if ("store_type".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(table.getStoreType()));
+        }
+      }
+      
+      tuples.add(aTuple);
+    }
+    
+    return tuples;
+  }
+  
+  private List<Tuple> getColumns(Schema outSchema) {
+    List<ColumnProto> columnsList = masterContext.getCatalog().getAllColumns();
+    List<Tuple> tuples = new ArrayList<Tuple>(columnsList.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    int columnId = 1, prevtid = -1, tid = 0;
+    
+    for (ColumnProto column: columnsList) {
+      aTuple = new VTuple(outSchema.size());
+      
+      tid = column.getTid();
+      if (prevtid != tid) {
+        columnId = 1;
+        prevtid = tid;
+      }
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column colObj = columns.get(fieldId);
+        
+        if ("tid".equalsIgnoreCase(colObj.getSimpleName())) {
+          if (column.hasTid()) {
+            aTuple.put(fieldId, DatumFactory.createInt4(tid));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        } else if ("column_name".equalsIgnoreCase(colObj.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(column.getName()));
+        } else if ("ordinal_position".equalsIgnoreCase(colObj.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(columnId));
+        } else if ("data_type".equalsIgnoreCase(colObj.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(column.getDataType().getType().toString()));
+        } else if ("type_length".equalsIgnoreCase(colObj.getSimpleName())) {
+          DataType dataType = column.getDataType();
+          if (dataType.hasLength()) {
+            aTuple.put(fieldId, DatumFactory.createInt4(dataType.getLength()));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        }
+      }
+      
+      columnId++;
+      tuples.add(aTuple);
+    }
+    
+    return tuples;
+  }
+  
+  private List<Tuple> getIndexes(Schema outSchema) {
+    List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes();
+    List<Tuple> tuples = new ArrayList<Tuple>(indexList.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    
+    for (IndexProto index: indexList) {
+      aTuple = new VTuple(outSchema.size());
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column column = columns.get(fieldId);
+        
+        if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId()));
+        } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(index.getTId()));
+        } else if ("index_name".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(index.getIndexName()));
+        } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(index.getColumnName()));
+        } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(index.getDataType()));
+        } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(index.getIndexType()));
+        } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique()));
+        } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered()));
+        } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending()));
+        }
+      }
+      
+      tuples.add(aTuple);
+    }
+    
+    return tuples;
+  }
+  
+  private List<Tuple> getAllTableOptions(Schema outSchema) {
+    List<TableOptionProto> optionList = masterContext.getCatalog().getAllTableOptions();
+    List<Tuple> tuples = new ArrayList<Tuple>(optionList.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    
+    for (TableOptionProto option: optionList) {
+      aTuple = new VTuple(outSchema.size());
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column column = columns.get(fieldId);
+        
+        if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(option.getTid()));
+        } else if ("key_".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getKey()));
+        } else if ("value_".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getValue()));
+        }
+      }
+      
+      tuples.add(aTuple);
+    }
+    
+    return tuples;
+  }
+  
+  private List<Tuple> getAllTableStats(Schema outSchema) {
+    List<TableStatsProto> statList = masterContext.getCatalog().getAllTableStats();
+    List<Tuple> tuples = new ArrayList<Tuple>(statList.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    
+    for (TableStatsProto stat: statList) {
+      aTuple = new VTuple(outSchema.size());
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column column = columns.get(fieldId);
+        
+        if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(stat.getTid()));
+        } else if ("num_rows".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumRows()));
+        } else if ("num_bytes".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumBytes()));
+        }
+      }
+      
+      tuples.add(aTuple);
+    }
+    
+    return tuples;
+  }
+  
+  private List<Tuple> getAllPartitions(Schema outSchema) {
+    List<TablePartitionProto> partitionList = masterContext.getCatalog().getAllPartitions();
+    List<Tuple> tuples = new ArrayList<Tuple>(partitionList.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    
+    for (TablePartitionProto partition: partitionList) {
+      aTuple = new VTuple(outSchema.size());
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column column = columns.get(fieldId);
+        
+        if ("pid".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(partition.getPid()));
+        } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(partition.getTid()));
+        } else if ("partition_name".equalsIgnoreCase(column.getSimpleName())) {
+          if (partition.hasPartitionName()) {
+            aTuple.put(fieldId, DatumFactory.createText(partition.getPartitionName()));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        } else if ("ordinal_position".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(partition.getOrdinalPosition()));
+        } else if ("path".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(partition.getPath()));
+        }
+      }
+      
+      tuples.add(aTuple);
+    }
+    
+    return tuples;
+  }
+  
+  private List<Tuple> fetchSystemTable(TableDesc tableDesc, Schema inSchema) {
+    List<Tuple> tuples = null;
+    String tableName = CatalogUtil.extractSimpleName(tableDesc.getName());
+
+    if ("tablespace".equalsIgnoreCase(tableName)) {
+      tuples = getTablespaces(inSchema);
+    } else if ("databases".equalsIgnoreCase(tableName)) {
+      tuples = getDatabases(inSchema);
+    } else if ("tables".equalsIgnoreCase(tableName)) {
+      tuples = getTables(inSchema);
+    } else if ("columns".equalsIgnoreCase(tableName)) {
+      tuples = getColumns(inSchema);
+    } else if ("indexes".equalsIgnoreCase(tableName)) {
+      tuples = getIndexes(inSchema);
+    } else if ("table_options".equalsIgnoreCase(tableName)) {
+      tuples = getAllTableOptions(inSchema);
+    } else if ("table_stats".equalsIgnoreCase(tableName)) {
+      tuples = getAllTableStats(inSchema);
+    } else if ("partitions".equalsIgnoreCase(tableName)) {
+      tuples = getAllPartitions(inSchema);
+    }
+    
+    return tuples;    
+  }
+
+  @Override
+  public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
+    List<ByteString> rows = new ArrayList<ByteString>();
+    int startRow = currentRow;
+    int endRow = startRow + fetchRowNum;
+    
+    if (physicalExec == null) {
+      return rows;
+    }
+    
+    while (currentRow < endRow) {
+      Tuple currentTuple = physicalExec.next();
+      
+      if (currentTuple == null) {
+        physicalExec.close();
+        physicalExec = null;
+        break;
+      }
+      
+      currentRow++;
+      rows.add(ByteString.copyFrom(encoder.toBytes(currentTuple)));
+      
+      if (currentRow >= maxRow) {
+        physicalExec.close();
+        physicalExec = null;
+        break;
+      }
+    }
+    
+    return rows;
+  }
+
+  @Override
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  @Override
+  public String getSessionId() {
+    return sessionId;
+  }
+  
+  @Override
+  public TableDesc getTableDesc() {
+    return tableDesc;
+  }
+  
+  @Override
+  public Schema getLogicalSchema() {
+    return outSchema;
+  }
+  
+  class SimplePhysicalPlannerImpl extends PhysicalPlannerImpl {
+
+    public SimplePhysicalPlannerImpl(TajoConf conf) {
+      super(conf);
+    }
+
+    @Override
+    public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node)
+        throws IOException {
+      return new SystemPhysicalExec(ctx, scanNode);
+    }
+
+    @Override
+    public PhysicalExec createIndexScanExec(TaskAttemptContext ctx, IndexScanNode annotation) throws IOException {
+      return new SystemPhysicalExec(ctx, annotation);
+    }
+  }
+  
+  class SystemPhysicalExec extends PhysicalExec {
+    
+    private ScanNode scanNode;
+    private EvalNode qual;
+    private Projector projector;
+    private TableStats tableStats;
+    private final List<Tuple> cachedData;
+    private int currentRow;
+    private boolean isClosed;
+
+    public SystemPhysicalExec(TaskAttemptContext context, ScanNode scanNode) {
+      super(context, scanNode.getInSchema(), scanNode.getOutSchema());
+      this.scanNode = scanNode;
+      this.qual = this.scanNode.getQual();
+      cachedData = TUtil.newList();
+      currentRow = 0;
+      isClosed = false;
+      
+      projector = new Projector(context, inSchema, outSchema, scanNode.getTargets());
+    }
+
+    @Override
+    public Tuple next() throws IOException {
+      Tuple aTuple = null;
+      Tuple outTuple = new VTuple(outColumnNum);
+      
+      if (isClosed) {
+        return null;
+      }
+      
+      if (cachedData.size() == 0) {
+        rescan();
+      }
+      
+      if (!scanNode.hasQual()) {
+        if (currentRow < cachedData.size()) {
+          aTuple = cachedData.get(currentRow++);
+          projector.eval(aTuple, outTuple);
+          outTuple.setOffset(aTuple.getOffset());
+          return outTuple;
+        }
+        return null;
+      } else {
+        while (currentRow < cachedData.size()) {
+          aTuple = cachedData.get(currentRow++);
+          if (qual.eval(inSchema, aTuple).isTrue()) {
+            projector.eval(aTuple, outTuple);
+            return outTuple;
+          }
+        }
+        return null;
+      }
+    }
+
+    @Override
+    public void rescan() throws IOException {
+      cachedData.clear();
+      cachedData.addAll(fetchSystemTable(scanNode.getTableDesc(), inSchema));
+
+      tableStats = new TableStats();
+      tableStats.setNumRows(cachedData.size());
+    }
+
+    @Override
+    public void close() throws IOException {
+      scanNode = null;
+      qual = null;
+      projector = null;
+      cachedData.clear();
+      currentRow = -1;
+      isClosed = true;
+    }
+
+    @Override
+    public float getProgress() {
+      return 1.0f;
+    }
+
+    @Override
+    protected void compile() throws CompilationError {
+      if (scanNode.hasQual()) {
+        qual = context.getPrecompiledEval(inSchema, qual);
+      }
+    }
+
+    @Override
+    public TableStats getInputStats() {
+      return tableStats;
+    }
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index ee99353..c413b65 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -525,7 +525,7 @@ public class TajoMasterClientService extends AbstractService {
 
         List<ByteString> rows = queryResultScanner.getNextRows(request.getFetchRowNum());
         SerializedResultSet.Builder resultSetBuilder = SerializedResultSet.newBuilder();
-        resultSetBuilder.setSchema(queryResultScanner.getTableDesc().getLogicalSchema().getProto());
+        resultSetBuilder.setSchema(queryResultScanner.getLogicalSchema().getProto());
         resultSetBuilder.addAllSerializedTuples(rows);
 
         builder.setResultSet(resultSetBuilder.build());

http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 10701f9..2242445 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -41,7 +41,9 @@ import org.apache.tajo.engine.planner.physical.StoreTableExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
+import org.apache.tajo.master.NonForwardQueryResultFileScanner;
 import org.apache.tajo.master.NonForwardQueryResultScanner;
+import org.apache.tajo.master.NonForwardQueryResultSystemScanner;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.exec.prehook.CreateTableHook;
 import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
@@ -104,6 +106,8 @@ public class QueryExecutor {
     } else if (plan.isExplain()) { // explain query
       execExplain(plan, response);
 
+    } else if (PlannerUtil.checkIfQueryTargetIsVirtualTable(plan)) {
+      execQueryOnVirtualTable(queryContext, session, sql, plan, response);
 
       // Simple query indicates a form of 'select * from tb_name [LIMIT X];'.
     } else if (PlannerUtil.checkIfSimpleQuery(plan)) {
@@ -183,6 +187,27 @@ public class QueryExecutor {
     response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
   }
 
+  public void execQueryOnVirtualTable(QueryContext queryContext, Session session, String query, LogicalPlan plan,
+                              SubmitQueryResponse.Builder response) throws Exception {
+    int maxRow = Integer.MAX_VALUE;
+    if (plan.getRootBlock().hasNode(NodeType.LIMIT)) {
+      LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
+      maxRow = (int) limitNode.getFetchFirstNum();
+    }
+    QueryId queryId = QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId());
+
+    NonForwardQueryResultScanner queryResultScanner =
+            new NonForwardQueryResultSystemScanner(context, plan, queryId, session.getSessionId(), maxRow);
+
+    queryResultScanner.init();
+    session.addNonForwardQueryResultScanner(queryResultScanner);
+
+    response.setQueryId(queryId.getProto());
+    response.setMaxRowNum(maxRow);
+    response.setTableDesc(queryResultScanner.getTableDesc().getProto());
+    response.setResultCode(ClientProtos.ResultCode.OK);
+  }
+
   public void execSimpleQuery(QueryContext queryContext, Session session, String query, LogicalPlan plan,
                               SubmitQueryResponse.Builder response) throws Exception {
     ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
@@ -202,7 +227,7 @@ public class QueryExecutor {
     QueryId queryId = QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId());
 
     NonForwardQueryResultScanner queryResultScanner =
-        new NonForwardQueryResultScanner(context.getConf(), session.getSessionId(), queryId, scanNode, desc, maxRow);
+        new NonForwardQueryResultFileScanner(context.getConf(), session.getSessionId(), queryId, scanNode, desc, maxRow);
 
     queryResultScanner.init();
     session.addNonForwardQueryResultScanner(queryResultScanner);

http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
new file mode 100644
index 0000000..bdd6dfc
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import static org.junit.Assert.*;
+import static org.hamcrest.CoreMatchers.*;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.benchmark.TPCH;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LimitNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.util.KeyValueSet;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.protobuf.ByteString;
+
+public class TestNonForwardQueryResultSystemScanner {
+  
+  private class CollectionMatcher<T> extends TypeSafeDiagnosingMatcher<Iterable<? extends T>> {
+    
+    private final Matcher<? extends T> matcher;
+    
+    public CollectionMatcher(Matcher<? extends T> matcher) {
+      this.matcher = matcher;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("a collection containing ").appendDescriptionOf(this.matcher);
+    }
+
+    @Override
+    protected boolean matchesSafely(Iterable<? extends T> item, Description mismatchDescription) {
+      boolean isFirst = true;
+      Iterator<? extends T> iterator = item.iterator();
+      
+      while (iterator.hasNext()) {
+        T obj = iterator.next();
+        if (this.matcher.matches(obj)) {
+          return true;
+        }
+        
+        if (!isFirst) {
+          mismatchDescription.appendText(", ");
+        }
+        
+        this.matcher.describeMismatch(obj, mismatchDescription);
+        isFirst = false;
+      }
+      return false;
+    }
+    
+  }
+  
+  private <T> Matcher<Iterable<? extends T>> hasItem(Matcher<? extends T> matcher) {
+    return new CollectionMatcher<T>(matcher);
+  }
+
+  private static LocalTajoTestingUtility testUtil;
+  private static TajoTestingCluster testingCluster;
+  private static TajoConf conf;
+  private static MasterContext masterContext;
+  
+  private static SQLAnalyzer analyzer;
+  private static LogicalPlanner logicalPlanner;
+  private static LogicalOptimizer logicalOptimizer;
+  
+  private static void setupTestingCluster() throws Exception {
+    testUtil = new LocalTajoTestingUtility();
+    String[] names, paths;
+    Schema[] schemas;
+    
+    TPCH tpch = new TPCH();
+    tpch.loadSchemas();
+    tpch.loadQueries();
+    
+    names = new String[] {"customer", "lineitem", "nation", "orders", "part", "partsupp", 
+        "region", "supplier", "empty_orders"};
+    schemas = new Schema[names.length];
+    for (int i = 0; i < names.length; i++) {
+      schemas[i] = tpch.getSchema(names[i]);
+    }
+
+    File file;
+    paths = new String[names.length];
+    for (int i = 0; i < names.length; i++) {
+      file = new File("src/test/tpch/" + names[i] + ".tbl");
+      if(!file.exists()) {
+        file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + names[i]
+            + ".tbl");
+      }
+      paths[i] = file.getAbsolutePath();
+    }
+    
+    KeyValueSet opt = new KeyValueSet();
+    opt.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    testUtil.setup(names, paths, schemas, opt);
+    
+    testingCluster = testUtil.getTestingCluster();
+  }
+  
+  @BeforeClass
+  public static void setUp() throws Exception {
+    setupTestingCluster();
+    
+    conf = testingCluster.getConfiguration();
+    masterContext = testingCluster.getMaster().getContext();
+    
+    GlobalEngine globalEngine = masterContext.getGlobalEngine();
+    analyzer = globalEngine.getAnalyzer();
+    logicalPlanner = globalEngine.getLogicalPlanner();
+    logicalOptimizer = globalEngine.getLogicalOptimizer();
+  }
+  
+  @AfterClass
+  public static void tearDown() throws Exception {
+    try {
+      Thread.sleep(2000);
+    } catch (Exception ignored) {
+    }
+    
+    testUtil.shutdown();
+  }
+  
+  private NonForwardQueryResultScanner getScanner(String sql) throws Exception {
+    QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
+    String sessionId = UUID.randomUUID().toString();
+    
+    return getScanner(sql, queryId, sessionId);
+  }
+  
+  private NonForwardQueryResultScanner getScanner(String sql, QueryId queryId, String sessionId) throws Exception {
+    QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf);
+    
+    Expr expr = analyzer.parse(sql);
+    LogicalPlan logicalPlan = logicalPlanner.createPlan(queryContext, expr);
+    logicalOptimizer.optimize(logicalPlan);
+    
+    int maxRow = Integer.MAX_VALUE;
+    if (logicalPlan.getRootBlock().hasNode(NodeType.LIMIT)) {
+      LimitNode limitNode = logicalPlan.getRootBlock().getNode(NodeType.LIMIT);
+      maxRow = (int) limitNode.getFetchFirstNum();
+    }
+    
+    NonForwardQueryResultScanner queryResultScanner = 
+        new NonForwardQueryResultSystemScanner(masterContext, logicalPlan, queryId, 
+            sessionId, maxRow);
+    
+    return queryResultScanner;
+  }
+  
+  @Test
+  public void testInit() throws Exception {
+    QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
+    String sessionId = UUID.randomUUID().toString();
+    NonForwardQueryResultScanner queryResultScanner = 
+        getScanner("SELECT SPACE_ID, SPACE_URI FROM INFORMATION_SCHEMA.TABLESPACE",
+            queryId, sessionId);
+    
+    queryResultScanner.init();
+    
+    assertThat(queryResultScanner.getQueryId(), is(notNullValue()));
+    assertThat(queryResultScanner.getLogicalSchema(), is(notNullValue()));
+    assertThat(queryResultScanner.getSessionId(), is(notNullValue()));
+    assertThat(queryResultScanner.getTableDesc(), is(notNullValue()));
+    
+    assertThat(queryResultScanner.getQueryId(), is(queryId));
+    assertThat(queryResultScanner.getSessionId(), is(sessionId));
+    
+    assertThat(queryResultScanner.getLogicalSchema().size(), is(2));
+    assertThat(queryResultScanner.getLogicalSchema().getColumn("space_id"), is(notNullValue()));
+  }
+  
+  private List<Tuple> getTupleList(RowStoreDecoder decoder, List<ByteString> bytes) {
+    List<Tuple> tuples = new ArrayList<Tuple>(bytes.size());
+    
+    for (ByteString byteString: bytes) {
+      Tuple aTuple = decoder.toTuple(byteString.toByteArray());
+      tuples.add(aTuple);
+    }
+    
+    return tuples;
+  }
+  
+  private <T> Matcher<Tuple> getTupleMatcher(final int fieldId, final Matcher<T> matcher) {
+    return new TypeSafeDiagnosingMatcher<Tuple>() {
+
+      @Override
+      public void describeTo(Description description) {
+        description.appendDescriptionOf(matcher);
+      }
+
+      @Override
+      protected boolean matchesSafely(Tuple item, Description mismatchDescription) {
+        Datum datum = item.get(fieldId);
+        Object itemValue = null;
+        
+        if (datum.type() == Type.TEXT) {
+          itemValue = datum.asChars();
+        } else if (datum.type() == Type.INT4) {
+          itemValue = datum.asInt4();
+        } else if (datum.type() == Type.INT8) {
+          itemValue = datum.asInt8();
+        }
+        
+        if (itemValue != null && matcher.matches(itemValue)) {
+          return true;
+        }
+        
+        matcher.describeMismatch(itemValue, mismatchDescription);
+        return false;
+      }
+    };
+  }
+  
+  @Test
+  public void testGetNextRowsForAggregateFunction() throws Exception {
+    NonForwardQueryResultScanner queryResultScanner = 
+        getScanner("SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES");
+    
+    queryResultScanner.init();
+    
+    List<ByteString> rowBytes = queryResultScanner.getNextRows(100);
+    
+    assertThat(rowBytes.size(), is(1));
+    
+    RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema());
+    List<Tuple> tuples = getTupleList(decoder, rowBytes);
+    
+    assertThat(tuples.size(), is(1));
+    assertThat(tuples, hasItem(getTupleMatcher(0, is(9L))));
+  }
+  
+  @Test
+  public void testGetNextRowsForTable() throws Exception {
+    NonForwardQueryResultScanner queryResultScanner =
+        getScanner("SELECT TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES");
+    
+    queryResultScanner.init();
+    
+    List<ByteString> rowBytes = queryResultScanner.getNextRows(100);
+    
+    assertThat(rowBytes.size(), is(9));
+    
+    RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema());
+    List<Tuple> tuples = getTupleList(decoder, rowBytes);;
+    
+    assertThat(tuples.size(), is(9));
+    assertThat(tuples, hasItem(getTupleMatcher(0, is("lineitem"))));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index eebee6f..9002f28 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -37,6 +37,7 @@ import org.apache.tajo.algebra.WindowSpec;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.plan.LogicalPlan.QueryBlock;
@@ -1314,7 +1315,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   }
 
   private void updatePhysicalInfo(TableDesc desc) {
-    if (desc.getPath() != null) {
+    if (desc.getPath() != null && desc.getMeta().getStoreType() != StoreType.SYSTEM) {
       try {
         Path path = new Path(desc.getPath());
         FileSystem fs = path.getFileSystem(new Configuration());

http://git-wip-us.apache.org/repos/asf/tajo/blob/021a6f0b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index d813432..0fbd359 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -139,6 +139,27 @@ public class PlannerUtil {
         (simpleOperator && noComplexComputation && isOneQueryBlock &&
             noOrderBy && noGroupBy && noWhere && noJoin && singleRelation);
   }
+  
+  /**
+   * Checks whether the target of this query is a virtual table or not.
+   * It will be removed after tajo storage supports catalog service access.
+   * 
+   */
+  public static boolean checkIfQueryTargetIsVirtualTable(LogicalPlan plan) {
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+    
+    boolean hasScanNode = plan.getRootBlock().hasNode(NodeType.SCAN);
+    LogicalNode[] scanNodes = findAllNodes(rootNode, NodeType.SCAN);
+    boolean isVirtualTable = scanNodes.length > 0;
+    ScanNode scanNode = null;
+    
+    for (LogicalNode node: scanNodes) {
+      scanNode = (ScanNode) node;
+      isVirtualTable &= (scanNode.getTableDesc().getMeta().getStoreType() == StoreType.SYSTEM);
+    }
+    
+    return !checkIfDDLPlan(rootNode) && hasScanNode && isVirtualTable;
+  }
 
   /**
    * Checks whether the query has 'from clause' or not.


Mime
View raw message