tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [1/4] TAJO-475: Table partition catalog recap. (Min Zhou and hyunsik)
Date Mon, 27 Jan 2014 08:29:36 GMT
Updated Branches:
  refs/heads/master accd0e512 -> eb563addd


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
index df49503..9f7d23d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/SchemaUtil.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.utils;
 
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
 
 public class SchemaUtil {
   public static Schema merge(Schema left, Schema right) {
@@ -51,4 +52,12 @@ public class SchemaUtil {
     
     return common;
   }
+
+  public static Schema getQualifiedLogicalSchema(TableDesc tableDesc, String tableName) {
+    Schema logicalSchema = new Schema(tableDesc.getLogicalSchema());
+    if (tableName != null) {
+      logicalSchema.setQualifier(tableName);
+    }
+    return logicalSchema;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index a969326..6c56964 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -32,7 +32,7 @@ import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.AlreadyExistsTableException;
 import org.apache.tajo.catalog.exception.NoSuchTableException;
-import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.exception.IllegalQueryStatusException;
@@ -277,12 +277,12 @@ public class GlobalEngine extends AbstractService {
       createTable.setPath(tablePath);
     }
 
-    return createTableOnPath(createTable.getTableName(), createTable.getSchema(), meta,
-        createTable.getPath(), !createTable.isExternal(), createTable.getPartitions());
+    return createTableOnPath(createTable.getTableName(), createTable.getTableSchema(), meta,
+        createTable.getPath(), !createTable.isExternal(), createTable.getPartitionMethod());
   }
 
   public TableDesc createTableOnPath(String tableName, Schema schema, TableMeta meta,
-                                     Path path, boolean isCreated, PartitionDesc partitionDesc)
+                                     Path path, boolean isCreated, PartitionMethodDesc partitionDesc)
       throws IOException {
     if (catalog.existsTable(tableName)) {
       throw new AlreadyExistsTableException(tableName);
@@ -312,7 +312,7 @@ public class GlobalEngine extends AbstractService {
     TableDesc desc = CatalogUtil.newTableDesc(tableName, schema, meta, path);
     desc.setStats(stats);
     if (partitionDesc != null) {
-      desc.setPartitions(partitionDesc);
+      desc.setPartitionMethod(partitionDesc);
     }
     catalog.addTable(desc);
 
@@ -388,8 +388,8 @@ public class GlobalEngine extends AbstractService {
       String tableName = createTableNode.getTableName();
       queryContext.setOutputTable(tableName);
       queryContext.setOutputPath(new Path(TajoConf.getWarehouseDir(context.getConf()), tableName));
-      if(createTableNode.getPartitions() != null) {
-        queryContext.setPartitions(createTableNode.getPartitions());
+      if(createTableNode.getPartitionMethod() != null) {
+        queryContext.setPartitionMethod(createTableNode.getPartitionMethod());
       }
       queryContext.setCreateTable();
     }
@@ -410,7 +410,6 @@ public class GlobalEngine extends AbstractService {
 
       // Set QueryContext settings, such as output table name and output path.
       // It also remove data files if overwrite is true.
-      String outputTableName;
       Path outputPath;
       if (insertNode.hasTargetTable()) { // INSERT INTO [TB_NAME]
         queryContext.setOutputTable(insertNode.getTableName());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 35e6dd9..6e2ee8b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -32,7 +32,7 @@ import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -405,9 +405,9 @@ public class TajoMasterClientService extends AbstractService {
 
         Schema schema = new Schema(request.getSchema());
         TableMeta meta = new TableMeta(request.getMeta());
-        PartitionDesc partitionDesc = null;
-        if (request.hasPartitions()) {
-          partitionDesc = new PartitionDesc(request.getPartitions());
+        PartitionMethodDesc partitionDesc = null;
+        if (request.hasPartition()) {
+          partitionDesc = new PartitionMethodDesc(request.getPartition());
         }
 
         TableDesc desc;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 7e4540b..f4a6da7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -18,8 +18,8 @@
 
 package org.apache.tajo.master.querymaster;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.ContentSummary;
@@ -35,9 +35,9 @@ import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.InsertNode;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
@@ -47,6 +47,7 @@ import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
 import java.util.*;
@@ -293,61 +294,78 @@ public class Query implements EventHandler<QueryEvent> {
     }
   }
 
-  public static class SubQueryCompletedTransition implements
-      MultipleArcTransition<Query, QueryEvent, QueryState> {
+  public static class SubQueryCompletedTransition implements MultipleArcTransition<Query,
QueryEvent, QueryState> {
+
+    private boolean hasNext(Query query) {
+      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+      ExecutionBlock nextBlock = cursor.peek();
+      return !query.getPlan().isTerminal(nextBlock);
+    }
+
+    private QueryState executeNextBlock(Query query) {
+      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+      ExecutionBlock nextBlock = cursor.nextBlock();
+      SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm);
+      nextSubQuery.setPriority(query.priority--);
+      query.addSubQuery(nextSubQuery);
+      nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT));
+
+      LOG.info("Scheduling SubQuery:" + nextSubQuery.getId());
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
+        LOG.debug("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+      }
+
+      return query.checkQueryForCompleted();
+    }
+
+    private QueryState finalizeQuery(Query query, SubQueryCompletedEvent event) {
+      MasterPlan masterPlan = query.getPlan();
+
+      if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
+        ExecutionBlock terminal = query.getPlan().getTerminalBlock();
+        DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId());
+        Path finalOutputDir = commitOutputData(query);
+
+        QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
+        try {
+          hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(),
+              finalOutputDir);
+        } catch (Exception e) {
+          query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
+          return QueryState.QUERY_FAILED;
+        } finally {
+          query.setFinishTime();
+        }
+        query.finished(QueryState.QUERY_SUCCEEDED);
+        query.eventHandler.handle(new QueryFinishEvent(query.getId()));
+      }
+
+      return QueryState.QUERY_SUCCEEDED;
+    }
 
     @Override
     public QueryState transition(Query query, QueryEvent event) {
       // increase the count for completed subqueries
       query.completedSubQueryCount++;
+
       SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
-      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
-      MasterPlan masterPlan = query.getPlan();
+
       // if the subquery is succeeded
       if (castEvent.getFinalState() == SubQueryState.SUCCEEDED) {
-        ExecutionBlock nextBlock = cursor.nextBlock();
-        if (!query.getPlan().isTerminal(nextBlock)) {
-          SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock,
query.sm);
-          nextSubQuery.setPriority(query.priority--);
-          query.addSubQuery(nextSubQuery);
-          nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT));
-          LOG.info("Scheduling SubQuery:" + nextSubQuery.getId());
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
-            LOG.debug("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
-          }
-          return query.checkQueryForCompleted();
-
-        } else { // Finish a query
-          if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
-            DataChannel finalChannel = masterPlan.getChannel(castEvent.getExecutionBlockId(),
nextBlock.getId());
-            Path finalOutputDir = commitOutputData(query);
-            TableDesc finalTableDesc = buildOrUpdateResultTableDesc(query, castEvent.getExecutionBlockId(),
-                finalOutputDir);
-
-            QueryContext queryContext = query.context.getQueryContext();
-            CatalogService catalog = query.context.getQueryMasterContext().getWorkerContext().getCatalog();
-
-            if (queryContext.hasOutputTable()) { // TRUE only if a query command is 'CREATE
TABLE' OR 'INSERT INTO'
-              if (queryContext.isOutputOverwrite()) { // TRUE only if a query is 'INSERT
OVERWRITE INTO'
-                catalog.deleteTable(finalTableDesc.getName());
-              }
-              catalog.addTable(finalTableDesc);
-            }
-            query.setResultDesc(finalTableDesc);
-            query.finished(QueryState.QUERY_SUCCEEDED);
-            query.eventHandler.handle(new QueryFinishEvent(query.getId()));
-          }
-
-          return QueryState.QUERY_SUCCEEDED;
+        if (hasNext(query)) { // if there is next block
+          return executeNextBlock(query);
+        } else {
+          return finalizeQuery(query, castEvent);
         }
-      } else if (castEvent.getFinalState() == SubQueryState.ERROR) {
-        query.setFinishTime();
-        return QueryState.QUERY_ERROR;
       } else {
-        // if at least one subquery is failed, the query is also failed.
         query.setFinishTime();
-        return QueryState.QUERY_FAILED;
+
+        if (castEvent.getFinalState() == SubQueryState.ERROR) {
+          return QueryState.QUERY_ERROR;
+        } else {
+          return QueryState.QUERY_FAILED;
+        }
       }
     }
 
@@ -374,63 +392,152 @@ public class Query implements EventHandler<QueryEvent> {
       return finalOutputDir;
     }
 
-    /**
-     * It builds a table desc and update the table desc if necessary.
-     */
-    public TableDesc buildOrUpdateResultTableDesc(Query query, ExecutionBlockId finalExecBlockId,
-                                                  Path finalOutputDir) {
-      // Determine the output table name
-      SubQuery subQuery = query.getSubQuery(finalExecBlockId);
+    private static interface QueryHook {
+      boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
Path finalOutputDir);
+      void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query
query,
+                   ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception;
+    }
 
-      String outputTableName;
-      PartitionDesc partitionDesc = null;
-      QueryContext queryContext = query.context.getQueryContext();
-      if (subQuery.getBlock().getPlan().getType() == NodeType.CREATE_TABLE) {
-        CreateTableNode createTableNode = (CreateTableNode) subQuery.getBlock().getPlan();
-        outputTableName = createTableNode.getTableName();
-        if (createTableNode.hasPartition()) {
-          partitionDesc = createTableNode.getPartitions();
-        }
-      } else {
-        if (queryContext.hasOutputTable()) { // CREATE TABLE or INSERT STATEMENT
-          outputTableName = queryContext.getOutputTable();
-        } else { // SELECT STATEMENT
-          outputTableName = query.getId().toString();
-        }
-        if(queryContext.hasPartitions()) {
-          partitionDesc = queryContext.getPartitions();
+    private class QueryHookExecutor {
+      private List<QueryHook> hookList = TUtil.newList();
+      private QueryMaster.QueryMasterContext context;
+
+      public QueryHookExecutor(QueryMaster.QueryMasterContext context) {
+        this.context = context;
+        hookList.add(new MaterializedResultHook());
+        hookList.add(new CreateTableHook());
+        hookList.add(new InsertTableHook());
+      }
+
+      public void execute(QueryContext queryContext, Query query,
+                          ExecutionBlockId finalExecBlockId,
+                          Path finalOutputDir) throws Exception {
+        for (QueryHook hook : hookList) {
+          if (hook.isEligible(queryContext, query, finalExecBlockId, finalOutputDir)) {
+            hook.execute(context, queryContext, query, finalExecBlockId, finalOutputDir);
+          }
         }
       }
+    }
 
-      TableMeta meta = subQuery.getTableMeta();
-      TableStats stats = subQuery.getTableStat();
-      try {
-        FileSystem fs = finalOutputDir.getFileSystem(query.systemConf);
-        ContentSummary directorySummary = fs.getContentSummary(finalOutputDir);
-        stats.setNumBytes(directorySummary.getLength());
-      } catch (IOException e) {
-        LOG.error(e.getMessage(), e);
+    private class MaterializedResultHook implements QueryHook {
+
+      @Override
+      public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId
finalExecBlockId,
+                                Path finalOutputDir) {
+        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        NodeType type = lastStage.getBlock().getPlan().getType();
+        return type != NodeType.CREATE_TABLE && type != NodeType.INSERT;
       }
-      TableDesc outputTableDesc = new TableDesc(outputTableName, subQuery.getSchema(), meta,
finalOutputDir);
-      outputTableDesc.setStats(stats);
-      TableDesc finalTableDesc = outputTableDesc;
-
-      // If a query has a target table, a TableDesc is updated.
-      if (queryContext.hasOutputTable()) { // CREATE TABLE or INSERT STATEMENT
-        if (queryContext.isOutputOverwrite()) {
-          CatalogService catalog = query.context.getQueryMasterContext().getWorkerContext().getCatalog();
-          Preconditions.checkNotNull(catalog, "CatalogService is NULL");
-          TableDesc updatingTable = catalog.getTableDesc(outputTableDesc.getName());
-          updatingTable.setStats(stats);
-          finalTableDesc = updatingTable;
+
+      @Override
+      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
Query query,
+                          ExecutionBlockId finalExecBlockId,
+                          Path finalOutputDir) throws Exception {
+        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        TableMeta meta = lastStage.getTableMeta();
+        TableStats stats = lastStage.getTableStat();
+
+        TableDesc resultTableDesc =
+            new TableDesc(
+                query.getId().toString(),
+                lastStage.getSchema(),
+                meta,
+                finalOutputDir);
+
+        stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
+        resultTableDesc.setStats(stats);
+        query.setResultDesc(resultTableDesc);
+      }
+    }
+
+    private class CreateTableHook implements QueryHook {
+
+      @Override
+      public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId
finalExecBlockId,
+                                Path finalOutputDir) {
+        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_TABLE;
+      }
+
+      @Override
+      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
Query query,
+                          ExecutionBlockId finalExecBlockId,
+                          Path finalOutputDir) throws Exception {
+        CatalogService catalog = context.getWorkerContext().getCatalog();
+        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        TableMeta meta = lastStage.getTableMeta();
+        TableStats stats = lastStage.getTableStat();
+
+        CreateTableNode createTableNode = (CreateTableNode) lastStage.getBlock().getPlan();
+
+        TableDesc tableDescTobeCreated =
+            new TableDesc(
+                createTableNode.getTableName(),
+                createTableNode.getTableSchema(),
+                meta,
+                finalOutputDir);
+
+        if (createTableNode.hasPartition()) {
+          tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod());
         }
+
+        stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
+        tableDescTobeCreated.setStats(stats);
+        query.setResultDesc(tableDescTobeCreated);
+
+        catalog.addTable(tableDescTobeCreated);
       }
+    }
+
+    private class InsertTableHook implements QueryHook {
 
-      if (partitionDesc != null) {
-        finalTableDesc.setPartitions(partitionDesc);
+      @Override
+      public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId
finalExecBlockId,
+                                Path finalOutputDir) {
+        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        return lastStage.getBlock().getPlan().getType() == NodeType.INSERT;
       }
 
-      return finalTableDesc;
+      @Override
+      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
Query query,
+                          ExecutionBlockId finalExecBlockId,
+                          Path finalOutputDir)
+          throws Exception {
+
+        CatalogService catalog = context.getWorkerContext().getCatalog();
+        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        TableMeta meta = lastStage.getTableMeta();
+        TableStats stats = lastStage.getTableStat();
+
+        InsertNode insertNode = (InsertNode) lastStage.getBlock().getPlan();
+
+        TableDesc finalTable;
+        if (insertNode.hasTargetTable()) {
+          String tableName = insertNode.getTableName();
+          finalTable = catalog.getTableDesc(tableName);
+        } else {
+          String tableName = query.getId().toString();
+          finalTable = new TableDesc(tableName, lastStage.getSchema(), meta, finalOutputDir);
+        }
+
+        long volume = getTableVolume(query.systemConf, finalOutputDir);
+        stats.setNumBytes(volume);
+        finalTable.setStats(stats);
+
+        if (insertNode.hasTargetTable()) {
+          catalog.deleteTable(insertNode.getTableName());
+          catalog.addTable(finalTable);
+        }
+
+        query.setResultDesc(finalTable);
+      }
+    }
+
+    private long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException {
+      FileSystem fs = tablePath.getFileSystem(systemConf);
+      ContentSummary directorySummary = fs.getContentSummary(tablePath);
+      return directorySummary.getLength();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestDDLBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestDDLBuilder.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestDDLBuilder.java
index 7f9d15c..390c348 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestDDLBuilder.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestDDLBuilder.java
@@ -21,6 +21,7 @@ package org.apache.tajo.client;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.util.FileUtil;
@@ -43,6 +44,16 @@ public class TestDDLBuilder {
 
     TableDesc desc = new TableDesc("table1", schema, meta, new Path("/table1"));
 
-    assertEquals(FileUtil.readTextFile(new File("src/test/resources/results/testBuildDDL.result")),
DDLBuilder.buildDDL(desc));
+    Schema expressionSchema = new Schema();
+    expressionSchema.addColumn("key", TajoDataTypes.Type.INT4);
+    PartitionMethodDesc partitionMethod = new PartitionMethodDesc(
+        "table1",
+        CatalogProtos.PartitionType.COLUMN,
+        "key",
+        expressionSchema);
+    desc.setPartitionMethod(partitionMethod);
+
+    assertEquals(FileUtil.readTextFile(new File("src/test/resources/results/testBuildDDL.result")),
+        DDLBuilder.buildDDL(desc));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 174f204..13db0d0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -415,7 +415,7 @@ public class TestTajoClient {
 
     assertFalse(client.existTable(tableName));
 
-    String sql = "create table " + tableName + " (score int4)";
+    String sql = "create table " + tableName + " (deptname text, score int4)";
     sql += "PARTITION BY COLUMN (deptname text)";
 
     client.updateQuery(sql);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestHiveConverter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestHiveConverter.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestHiveConverter.java
index 2e4221d..e090c89 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestHiveConverter.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestHiveConverter.java
@@ -23,7 +23,6 @@ import org.antlr.v4.runtime.CommonTokenStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.engine.parser.SQLParser.Boolean_value_expressionContext;
 import org.apache.tajo.engine.parser.SQLParser.SqlContext;
 import org.apache.tajo.util.FileUtil;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
index e81326e..8d7cb94 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
@@ -252,8 +252,8 @@ public class TestSQLAnalyzer {
     assertEquals(OpType.CreateTable, expr.getType());
     CreateTable createTable = (CreateTable) expr;
     assertTrue(createTable.hasPartition());
-    assertEquals(CreateTable.PartitionType.HASH, createTable.getPartition().getPartitionType());
-    CreateTable.HashPartition hashPartition = createTable.getPartition();
+    assertEquals(CreateTable.PartitionType.HASH, createTable.getPartitionMethod().getPartitionType());
+    CreateTable.HashPartition hashPartition = createTable.getPartitionMethod();
     assertEquals("col1", hashPartition.getColumns()[0].getCanonicalName());
     assertTrue(hashPartition.hasQuantifier());
   }
@@ -265,8 +265,8 @@ public class TestSQLAnalyzer {
     assertEquals(OpType.CreateTable, expr.getType());
     CreateTable createTable = (CreateTable) expr;
     assertTrue(createTable.hasPartition());
-    assertEquals(CreateTable.PartitionType.HASH, createTable.getPartition().getPartitionType());
-    CreateTable.HashPartition hashPartition = createTable.getPartition();
+    assertEquals(CreateTable.PartitionType.HASH, createTable.getPartitionMethod().getPartitionType());
+    CreateTable.HashPartition hashPartition = createTable.getPartitionMethod();
     assertEquals("col1", hashPartition.getColumns()[0].getCanonicalName());
     assertTrue(hashPartition.hasSpecifiers());
     assertEquals(3, hashPartition.getSpecifiers().size());
@@ -279,8 +279,8 @@ public class TestSQLAnalyzer {
     assertEquals(OpType.CreateTable, expr.getType());
     CreateTable createTable = (CreateTable) expr;
     assertTrue(createTable.hasPartition());
-    assertEquals(CreateTable.PartitionType.RANGE, createTable.getPartition().getPartitionType());
-    CreateTable.RangePartition rangePartition = createTable.getPartition();
+    assertEquals(CreateTable.PartitionType.RANGE, createTable.getPartitionMethod().getPartitionType());
+    CreateTable.RangePartition rangePartition = createTable.getPartitionMethod();
     assertEquals("col1", rangePartition.getColumns()[0].getCanonicalName());
     assertEquals(3, rangePartition.getSpecifiers().size());
   }
@@ -292,8 +292,8 @@ public class TestSQLAnalyzer {
     assertEquals(OpType.CreateTable, expr.getType());
     CreateTable createTable = (CreateTable) expr;
     assertTrue(createTable.hasPartition());
-    assertEquals(CreateTable.PartitionType.LIST, createTable.getPartition().getPartitionType());
-    CreateTable.ListPartition listPartition = createTable.getPartition();
+    assertEquals(CreateTable.PartitionType.LIST, createTable.getPartitionMethod().getPartitionType());
+    CreateTable.ListPartition listPartition = createTable.getPartitionMethod();
     assertEquals("col1", listPartition.getColumns()[0].getCanonicalName());
     assertEquals(2, listPartition.getSpecifiers().size());
     Iterator<CreateTable.ListPartitionSpecifier> iterator = listPartition.getSpecifiers().iterator();
@@ -317,8 +317,8 @@ public class TestSQLAnalyzer {
     assertEquals(OpType.CreateTable, expr.getType());
     CreateTable createTable = (CreateTable) expr;
     assertTrue(createTable.hasPartition());
-    assertEquals(CreateTable.PartitionType.COLUMN, createTable.getPartition().getPartitionType());
-    CreateTable.ColumnPartition columnPartition = createTable.getPartition();
+    assertEquals(CreateTable.PartitionType.COLUMN, createTable.getPartitionMethod().getPartitionType());
+    CreateTable.ColumnPartition columnPartition = createTable.getPartitionMethod();
     assertEquals(3, columnPartition.getColumns().length);
     assertEquals("col3", columnPartition.getColumns()[0].getColumnName());
     assertEquals("col4", columnPartition.getColumns()[1].getColumnName());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 9eb0276..81f57d4 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -628,7 +628,7 @@ public class TestLogicalPlanner {
     assertEquals(NodeType.CREATE_TABLE, root.getChild().getType());
     CreateTableNode createTable = root.getChild();
 
-    Schema def = createTable.getSchema();
+    Schema def = createTable.getTableSchema();
     assertEquals("name", def.getColumn(0).getColumnName());
     assertEquals(Type.TEXT, def.getColumn(0).getDataType().getType());
     assertEquals("age", def.getColumn(1).getColumnName());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
index b117f64..aacf588 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestCTASQuery.java
@@ -26,7 +26,7 @@ import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TpchTestBase;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -67,10 +67,11 @@ public class TestCTASQuery {
     CatalogService catalog = cluster.getMaster().getCatalog();
     TableDesc desc = catalog.getTableDesc(tableName);
     assertTrue(catalog.existsTable(tableName));
-    assertTrue(desc.getSchema().containsByQualifiedName("testCtasWithoutTableDefinition.col1"));
-    PartitionDesc partitionDesc = desc.getPartitions();
-    assertEquals(partitionDesc.getPartitionsType(), CatalogProtos.PartitionsType.COLUMN);
-    assertEquals("key", partitionDesc.getColumns().get(0).getColumnName());
+
+    assertTrue(desc.getSchema().contains("testCtasWithoutTableDefinition.col1"));
+    PartitionMethodDesc partitionDesc = desc.getPartitionMethod();
+    assertEquals(partitionDesc.getPartitionType(), CatalogProtos.PartitionType.COLUMN);
+    assertEquals("key", partitionDesc.getExpressionSchema().getColumns().get(0).getColumnName());
 
     FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
     Path path = desc.getPath();
@@ -99,8 +100,7 @@ public class TestCTASQuery {
     assertEquals(2, i);
   }
 
-  //@Test
-  // TODO- to be enabled
+  @Test
   public final void testCtasWithColumnedPartition() throws Exception {
     String tableName ="testCtasWithColumnedPartition";
     tpch.execute(
@@ -112,9 +112,9 @@ public class TestCTASQuery {
     CatalogService catalog = cluster.getMaster().getCatalog();
     TableDesc desc = catalog.getTableDesc(tableName);
     assertTrue(catalog.existsTable(tableName));
-    PartitionDesc partitionDesc = desc.getPartitions();
-    assertEquals(partitionDesc.getPartitionsType(), CatalogProtos.PartitionsType.COLUMN);
-    assertEquals("key", partitionDesc.getColumns().get(0).getColumnName());
+    PartitionMethodDesc partitionDesc = desc.getPartitionMethod();
+    assertEquals(partitionDesc.getPartitionType(), CatalogProtos.PartitionType.COLUMN);
+    assertEquals("key", partitionDesc.getExpressionSchema().getColumns().get(0).getColumnName());
 
     FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
     Path path = desc.getPath();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index ca5c071..df20e76 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -51,6 +51,38 @@ public class TestTablePartitions {
   }
 
   @Test
+  public final void testCreateColumnPartitionedTable() throws Exception {
+    String tableName ="testCreateColumnPartitionedTable";
+    ResultSet res = tpch.execute(
+        "create table " + tableName +" (col1 int4, col2 int4) partition by column(key float8)
");
+    res.close();
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+    assertEquals(2, catalog.getTableDesc(tableName).getSchema().getColumnNum());
+    assertEquals(3, catalog.getTableDesc(tableName).getLogicalSchema().getColumnNum());
+
+    res = tpch.execute("insert overwrite into " + tableName + " select l_orderkey, l_partkey,
l_quantity from lineitem");
+    res.close();
+  }
+
+  @Test
+  public final void testCreateColumnPartitionedTableWithSelectedColumns() throws Exception
{
+    String tableName ="testCreateColumnPartitionedTableWithSelectedColumns";
+    ResultSet res = tpch.execute(
+        "create table " + tableName +" (col1 int4, col2 int4, null_col int4) partition by
column(key float8) ");
+    res.close();
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+    assertEquals(3, catalog.getTableDesc(tableName).getSchema().getColumnNum());
+    assertEquals(4, catalog.getTableDesc(tableName).getLogicalSchema().getColumnNum());
+
+    res = tpch.execute("insert overwrite into " + tableName + " (col1, col2, key) select
l_orderkey, l_partkey, l_quantity from lineitem");
+    res.close();
+  }
+
+  @Test
   public final void testColumnPartitionedTableByOneColumn() throws Exception {
     String tableName ="testColumnPartitionedTableByOneColumn";
     ResultSet res = tpch.execute(
@@ -166,7 +198,7 @@ public class TestTablePartitions {
     assertTrue(catalog.existsTable(tableName));
 
     res = tpch.execute(
-        "insert overwrite into " + tableName + " select  l_partkey, l_quantity, l_orderkey
from lineitem");
+        "insert overwrite into " + tableName + " select l_partkey, l_quantity, l_orderkey
from lineitem");
     res.close();
     TableDesc desc = catalog.getTableDesc(tableName);
     assertEquals(5, desc.getStats().getNumRows().intValue());
@@ -246,7 +278,7 @@ public class TestTablePartitions {
 
     res = tpch.execute(
         "insert overwrite into " + tableName +
-            " select  l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+            " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
     res.close();
     TableDesc desc = catalog.getTableDesc(tableName);
     assertEquals(5, desc.getStats().getNumRows().intValue());
@@ -328,7 +360,7 @@ public class TestTablePartitions {
 
     res = tpch.execute(
         "insert overwrite into " + tableName +
-            " select  l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+            " select l_returnflag , l_orderkey, l_partkey, l_quantity from lineitem");
     res.close();
     TableDesc desc = catalog.getTableDesc(tableName);
     assertEquals(5, desc.getStats().getNumRows().intValue());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
index 7f10736..169ed23 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java
@@ -20,17 +20,11 @@ package org.apache.tajo.engine.query;
 
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.QueryTestCaseBase;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.TableDesc;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.sql.ResultSet;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 /*
  * Notations
  * - S - select

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/test/resources/results/testBuildDDL.result
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/results/testBuildDDL.result b/tajo-core/tajo-core-backend/src/test/resources/results/testBuildDDL.result
index 4fd87c2..dd11188 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/results/testBuildDDL.result
+++ b/tajo-core/tajo-core-backend/src/test/resources/results/testBuildDDL.result
@@ -2,4 +2,4 @@
 -- Name: table1; Type: TABLE; Storage: CSV
 -- Path: /table1
 --
-CREATE EXTERNAL TABLE table1 (name BLOB, addr TEXT) USING CSV WITH ('csv.delimiter'='|',
'compression.codec'='org.apache.hadoop.io.compress.GzipCodec') LOCATION '/table1';
\ No newline at end of file
+CREATE EXTERNAL TABLE table1 (name BLOB, addr TEXT) USING CSV WITH ('csv.delimiter'='|',
'compression.codec'='org.apache.hadoop.io.compress.GzipCodec') PARTITION BY COLUMN(key INT4)
LOCATION '/table1';
\ No newline at end of file


Mime
View raw message