drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [1/3] drill git commit: DRILL-3246 : Query planning support for partition by clause in CTAS statement.
Date Mon, 22 Jun 2015 18:00:55 GMT
Repository: drill
Updated Branches:
  refs/heads/master 6ebfbb9d0 -> 9c125b0d9


DRILL-3246 :  Query planning support for partition by clause in CTAS statement.

Add partition comparator function into project under writer.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4c444663
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4c444663
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4c444663

Branch: refs/heads/master
Commit: 4c444663fa9fc9ef6104396a426f8d6fa6c4afa0
Parents: 6ebfbb9
Author: Jinfeng Ni <jni@apache.org>
Authored: Mon Jun 1 18:14:50 2015 -0700
Committer: Jinfeng Ni <jni@apache.org>
Committed: Sat Jun 20 15:42:36 2015 -0700

----------------------------------------------------------------------
 .../src/main/codegen/includes/parserImpls.ftl   |  11 +-
 .../exec/planner/logical/CreateTableEntry.java  |   3 +
 .../exec/planner/logical/DrillWriterRel.java    |  33 ++++
 .../logical/FileSystemCreateTableEntry.java     |  17 +-
 .../drill/exec/planner/physical/WriterPrel.java |   3 +-
 .../exec/planner/physical/WriterPrule.java      |  22 ++-
 .../physical/visitor/StarColumnConverter.java   |   7 +-
 .../sql/handlers/CreateTableHandler.java        | 155 ++++++++++++++++++-
 .../planner/sql/handlers/SqlHandlerUtil.java    | 105 +++++++++++++
 .../sql/parser/CompoundIdentifierConverter.java |   2 +-
 .../exec/planner/sql/parser/SqlCreateTable.java |  42 +++--
 .../exec/planner/sql/parser/SqlCreateView.java  |  15 +-
 .../apache/drill/exec/store/AbstractSchema.java |   9 +-
 .../drill/exec/store/SubSchemaWrapper.java      |   5 +-
 .../exec/store/dfs/FileSystemSchemaFactory.java |   5 +-
 .../drill/exec/store/dfs/FormatPlugin.java      |   2 +-
 .../exec/store/dfs/WorkspaceSchemaFactory.java  |   6 +-
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |   4 +-
 .../drill/exec/store/dfs/easy/EasyWriter.java   |   9 +-
 .../exec/store/parquet/ParquetFormatPlugin.java |   4 +-
 .../drill/exec/store/parquet/ParquetWriter.java |  14 +-
 .../org/apache/drill/TestExampleQueries.java    |  23 +++
 22 files changed, 438 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
index 1605b06..eedba99 100644
--- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
+++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
@@ -154,7 +154,7 @@ SqlNodeList ParseFieldList(String relType) :
     }
     |
     {
-        return null;
+        return SqlNodeList.EMPTY;
     }
 }
 
@@ -208,16 +208,23 @@ SqlNode SqlCreateTable() :
     SqlParserPos pos;
     SqlIdentifier tblName;
     SqlNodeList fieldList;
+    SqlNodeList partitionFieldList;
     SqlNode query;
 }
 {
+    {
+        partitionFieldList = SqlNodeList.EMPTY;
+    }
     <CREATE> { pos = getPos(); }
     <TABLE>
     tblName = CompoundIdentifier()
     fieldList = ParseFieldList("Table")
+    (   <PARTITION> <BY>
+        partitionFieldList = ParseFieldList("Partition")
+    )?
     <AS>
     query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
     {
-        return new SqlCreateTable(pos, tblName, fieldList, query);
+        return new SqlCreateTable(pos, tblName, fieldList, partitionFieldList, query);
     }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/CreateTableEntry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/CreateTableEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/CreateTableEntry.java
index 673e8c6..593ba17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/CreateTableEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/CreateTableEntry.java
@@ -19,6 +19,7 @@
 package org.apache.drill.exec.planner.logical;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Writer;
@@ -38,4 +39,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 public interface CreateTableEntry {
 
   Writer getWriter(PhysicalOperator child) throws IOException;
+
+  List<String> getPartitionColumns();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWriterRel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWriterRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWriterRel.java
index fc93c3e..94de680 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWriterRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWriterRel.java
@@ -19,6 +19,14 @@ package org.apache.drill.exec.planner.logical;
 
 import java.util.List;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationImpl;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.common.logical.data.Writer;
 import org.apache.drill.exec.planner.common.DrillWriterRelBase;
@@ -28,9 +36,12 @@ import org.apache.calcite.plan.RelTraitSet;
 
 public class DrillWriterRel extends DrillWriterRelBase implements DrillRel {
 
+  private final List<Integer> partitionKeys;
+
   public DrillWriterRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, CreateTableEntry createTableEntry) {
     super(DRILL_LOGICAL, cluster, traitSet, input, createTableEntry);
     setRowType();
+    this.partitionKeys = resolvePartitionKeys();
   }
 
   @Override
@@ -47,4 +58,26 @@ public class DrillWriterRel extends DrillWriterRelBase implements DrillRel {
         .setCreateTableEntry(getCreateTableEntry())
         .build();
   }
+
+
+  private List<Integer> resolvePartitionKeys(){
+    final List<Integer> keys = Lists.newArrayList();
+    final RelDataType inputRowType = getInput().getRowType();
+    final List<String> partitionCol = getCreateTableEntry().getPartitionColumns();
+
+    for (final String col : partitionCol) {
+      final RelDataTypeField field = inputRowType.getField(col, false, false);
+      Preconditions.checkArgument(field != null,
+          String.format("partition col %s could not be resolved in table's column lists!", col));
+      keys.add(field.getIndex());
+    }
+
+    return keys;
+  }
+
+  public List<Integer> getPartitionKeys() {
+    return this.partitionKeys;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
index 6784888..672092d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java
@@ -18,8 +18,10 @@
 package org.apache.drill.exec.planner.logical;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Writer;
@@ -31,6 +33,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.store.ischema.Records;
 
 /**
  * Implements <code>CreateTableEntry</code> interface to create new tables in FileSystem storage.
@@ -41,24 +44,29 @@ public class FileSystemCreateTableEntry implements CreateTableEntry {
   private FileSystemConfig storageConfig;
   private FormatPlugin formatPlugin;
   private String location;
+  private final List<String> partitionColumns;
 
   @JsonCreator
   public FileSystemCreateTableEntry(@JsonProperty("storageConfig") FileSystemConfig storageConfig,
                                     @JsonProperty("formatConfig") FormatPluginConfig formatConfig,
                                     @JsonProperty("location") String location,
+                                    @JsonProperty("partitionColumn") List<String> partitionColumns,
                                     @JacksonInject StoragePluginRegistry engineRegistry)
       throws ExecutionSetupException {
     this.storageConfig = storageConfig;
     this.formatPlugin = engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     this.location = location;
+    this.partitionColumns = partitionColumns;
   }
 
   public FileSystemCreateTableEntry(FileSystemConfig storageConfig,
                                     FormatPlugin formatPlugin,
-                                    String location) {
+                                    String location,
+                                    List<String> partitionColumns) {
     this.storageConfig = storageConfig;
     this.formatPlugin = formatPlugin;
     this.location = location;
+    this.partitionColumns = partitionColumns;
   }
 
   @JsonProperty("storageConfig")
@@ -73,6 +81,11 @@ public class FileSystemCreateTableEntry implements CreateTableEntry {
 
   @Override
   public Writer getWriter(PhysicalOperator child) throws IOException {
-    return formatPlugin.getWriter(child, location);
+    return formatPlugin.getWriter(child, location, partitionColumns);
+  }
+
+  @Override
+  public List<String> getPartitionColumns() {
+    return partitionColumns;
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
index ceecb03..513776d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrel.java
@@ -32,7 +32,8 @@ import org.apache.calcite.plan.RelTraitSet;
 
 public class WriterPrel extends DrillWriterRelBase implements Prel {
 
-
+  public static final String PARTITION_COMPARATOR_FIELD = "P_A_R_T_I_T_I_O_N_C_O_M_P_A_R_A_T_O_R";
+  public static final String PARTITION_COMPARATOR_FUNC = "newPartitionValue";
 
   public WriterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, CreateTableEntry createTableEntry) {
     super(Prel.DRILL_PHYSICAL, cluster, traits, child, createTableEntry);

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
index 5790665..e191423 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
@@ -17,6 +17,12 @@
  */
 package org.apache.drill.exec.planner.physical;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationImpl;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.planner.common.DrillWriterRelBase;
 import org.apache.drill.exec.planner.logical.DrillRel;
 import org.apache.drill.exec.planner.logical.DrillWriterRel;
@@ -26,6 +32,8 @@ import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTraitSet;
 
+import java.util.List;
+
 public class WriterPrule extends Prule{
   public static final RelOptRule INSTANCE = new WriterPrule();
 
@@ -36,10 +44,11 @@ public class WriterPrule extends Prule{
 
   @Override
   public void onMatch(RelOptRuleCall call) {
-    final DrillWriterRelBase writer = call.rel(0);
+    final DrillWriterRel writer = call.rel(0);
     final RelNode input = call.rel(1);
 
-    final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+    final RelCollation collation = getCollation(writer.getPartitionKeys());
+    final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation);
     final RelNode convertedInput = convert(input, traits);
 
     if (!new WriteTraitPull(call).go(writer, convertedInput)) {
@@ -50,6 +59,15 @@ public class WriterPrule extends Prule{
     }
   }
 
+  private RelCollation getCollation(List<Integer> keys){
+    List<RelFieldCollation> fields = Lists.newArrayList();
+    for (int key : keys) {
+      fields.add(new RelFieldCollation(key));
+    }
+    return RelCollationImpl.of(fields);
+  }
+
+
   private class WriteTraitPull extends SubsetTransformer<DrillWriterRelBase, RuntimeException> {
 
     public WriteTraitPull(RelOptRuleCall call) {

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
index 7a471d7..0148d47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/StarColumnConverter.java
@@ -90,12 +90,15 @@ public class StarColumnConverter extends BasePrelVisitor<Prel, Void, RuntimeExce
     }
   }
 
+  // Note: the logic of handling * column for Writer is moved to ProjectForWriterVisitor.
+
   @Override
   public Prel visitWriter(WriterPrel prel, Void value) throws RuntimeException {
-    Prel child = ((Prel) prel.getInput(0)).accept(this, null);
+    RelNode child = ((Prel) prel.getInput(0)).accept(this, null);
     if (prefixedForStar) {
       prefixedForWriter = true;
-      return insertProjUnderScreenOrWriter(prel, prel.getInput().getRowType(), child);
+      // return insertProjUnderScreenOrWriter(prel, prel.getInput().getRowType(), child);
+      return (Prel) prel.copy(prel.getTraitSet(), Collections.singletonList(child));
     } else {
       return prel;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
index 2866b8c..b867549 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
@@ -18,13 +18,32 @@
 package org.apache.drill.exec.planner.sql.handlers;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
 
+import org.apache.calcite.util.Pair;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.logical.DrillRel;
@@ -32,12 +51,18 @@ import org.apache.drill.exec.planner.logical.DrillScreenRel;
 import org.apache.drill.exec.planner.logical.DrillStoreRel;
 import org.apache.drill.exec.planner.logical.DrillWriterRel;
 import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ProjectAllowDupPrel;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.WriterPrel;
+import org.apache.drill.exec.planner.physical.visitor.BasePrelVisitor;
+import org.apache.drill.exec.planner.sql.DrillSqlOperator;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
 import org.apache.drill.exec.planner.sql.SchemaUtilites;
 import org.apache.drill.exec.planner.sql.parser.SqlCreateTable;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
 
 public class CreateTableHandler extends DefaultSqlHandler {
   public CreateTableHandler(SqlHandlerConfig config, Pointer<String> textPlan) {
@@ -52,6 +77,7 @@ public class CreateTableHandler extends DefaultSqlHandler {
     final RelNode newTblRelNode =
         SqlHandlerUtil.resolveNewTableRel(false, planner, sqlCreateTable.getFieldNames(), sqlCreateTable.getQuery());
 
+
     final AbstractSchema drillSchema =
         SchemaUtilites.resolveToMutableDrillSchema(context.getNewDefaultSchema(), sqlCreateTable.getSchemaPath());
     final String schemaPath = drillSchema.getFullSchemaName();
@@ -62,12 +88,14 @@ public class CreateTableHandler extends DefaultSqlHandler {
           .build();
     }
 
-    log("Optiq Logical", newTblRelNode);
+    final RelNode newTblRelNodeWithPCol = SqlHandlerUtil.qualifyPartitionCol(newTblRelNode, sqlCreateTable.getPartitionColumns());
+
+    log("Optiq Logical", newTblRelNodeWithPCol);
 
     // Convert the query to Drill Logical plan and insert a writer operator on top.
-    DrillRel drel = convertToDrel(newTblRelNode, drillSchema, newTblName);
+    DrillRel drel = convertToDrel(newTblRelNodeWithPCol, drillSchema, newTblName, sqlCreateTable.getPartitionColumns());
     log("Drill Logical", drel);
-    Prel prel = convertToPrel(drel);
+    Prel prel = convertToPrel(drel, newTblRelNode.getRowType(), sqlCreateTable.getPartitionColumns());
     log("Drill Physical", prel);
     PhysicalOperator pop = convertToPop(prel);
     PhysicalPlan plan = convertToPlan(pop);
@@ -76,7 +104,8 @@ public class CreateTableHandler extends DefaultSqlHandler {
     return plan;
   }
 
-  private DrillRel convertToDrel(RelNode relNode, AbstractSchema schema, String tableName) throws RelConversionException {
+  private DrillRel convertToDrel(RelNode relNode, AbstractSchema schema, String tableName, List<String> partitionColumns)
+      throws RelConversionException {
     RelNode convertedRelNode = planner.transform(DrillSqlWorker.LOGICAL_RULES,
         relNode.getTraitSet().plus(DrillRel.DRILL_LOGICAL), relNode);
 
@@ -85,8 +114,122 @@ public class CreateTableHandler extends DefaultSqlHandler {
     }
 
     DrillWriterRel writerRel = new DrillWriterRel(convertedRelNode.getCluster(), convertedRelNode.getTraitSet(),
-        convertedRelNode, schema.createNewTable(tableName));
+        convertedRelNode, schema.createNewTable(tableName, partitionColumns));
     return new DrillScreenRel(writerRel.getCluster(), writerRel.getTraitSet(), writerRel);
   }
 
-}
+  private Prel convertToPrel(RelNode drel, RelDataType inputRowType, List<String> partitionColumns)
+      throws RelConversionException, SqlUnsupportedException {
+    Prel prel = convertToPrel(drel);
+
+    prel = prel.accept(new ProjectForWriterVisitor(inputRowType, partitionColumns), null);
+
+    return prel;
+  }
+
+  /**
+   * A PrelVisitor which will insert a project under Writer.
+   *
+   * For CTAS : create table t1 partition by (con_A) select * from T1;
+   *   A Project with Item expr will be inserted, in addition to *.  We need insert another Project to remove
+   *   this additional expression.
+   *
+   * In addition, to make execution's implementation easier,  a special field is added to Project :
+   *     PARTITION_COLUMN_IDENTIFIER = newPartitionValue(Partition_colA)
+   *                                    || newPartitionValue(Partition_colB)
+   *                                    || ...
+   *                                    || newPartitionValue(Partition_colN).
+   */
+  private class ProjectForWriterVisitor extends BasePrelVisitor<Prel, Void, RuntimeException> {
+
+    private final RelDataType queryRowType;
+    private final List<String> partitionColumns;
+
+    ProjectForWriterVisitor(RelDataType queryRowType, List<String> partitionColumns) {
+      this.queryRowType = queryRowType;
+      this.partitionColumns = partitionColumns;
+    }
+
+    @Override
+    public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
+      List<RelNode> children = Lists.newArrayList();
+      for(Prel child : prel){
+        child = child.accept(this, null);
+        children.add(child);
+      }
+
+      return (Prel) prel.copy(prel.getTraitSet(), children);
+
+    }
+
+    @Override
+    public Prel visitWriter(WriterPrel prel, Void value) throws RuntimeException {
+
+      final Prel child = ((Prel)prel.getInput()).accept(this, null);
+
+      final RelDataType childRowType = child.getRowType();
+
+      final RelOptCluster cluster = prel.getCluster();
+
+      final List<RexNode> exprs = Lists.newArrayListWithExpectedSize(queryRowType.getFieldCount() + 1);
+      final List<String> fieldnames = new ArrayList<String>(queryRowType.getFieldNames());
+
+      for (final RelDataTypeField field : queryRowType.getFieldList()) {
+        exprs.add(RexInputRef.of(field.getIndex(), queryRowType));
+      }
+
+      // No partition columns.
+      if (partitionColumns.size() == 0) {
+        final ProjectPrel projectUnderWriter = new ProjectAllowDupPrel(cluster,
+            cluster.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL), child, exprs, queryRowType);
+
+        return (Prel) prel.copy(projectUnderWriter.getTraitSet(),
+            Collections.singletonList( (RelNode) projectUnderWriter));
+      } else {
+        // find list of partiiton columns.
+        final List<RexNode> partitionColumnExprs = Lists.newArrayListWithExpectedSize(partitionColumns.size());
+        for (final String colName : partitionColumns) {
+          final RelDataTypeField field = childRowType.getField(colName, false, false);
+
+          if (field == null) {
+            throw UserException.validationError()
+                .message("Partition column %s is not in the SELECT list of CTAS!", colName)
+                .build();
+          }
+
+          partitionColumnExprs.add(RexInputRef.of(field.getIndex(), childRowType));
+        }
+
+        // Add partition column comparator to Project's field name list.
+        fieldnames.add(WriterPrel.PARTITION_COMPARATOR_FIELD);
+
+        // Add partition column comparator to Project's expression list.
+        final RexNode partionColComp = createPartitionColComparator(prel.getCluster().getRexBuilder(), partitionColumnExprs);
+        exprs.add(partionColComp);
+
+
+        final RelDataType rowTypeWithPCComp = RexUtil.createStructType(cluster.getTypeFactory(), exprs, fieldnames);
+
+        final ProjectPrel projectUnderWriter = new ProjectAllowDupPrel(cluster,
+            cluster.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL), child, exprs, rowTypeWithPCComp);
+
+        return (Prel) prel.copy(projectUnderWriter.getTraitSet(),
+            Collections.singletonList( (RelNode) projectUnderWriter));
+      }
+    }
+
+  }
+
+  private RexNode createPartitionColComparator(final RexBuilder rexBuilder, List<RexNode> inputs) {
+    final DrillSqlOperator op = new DrillSqlOperator(WriterPrel.PARTITION_COMPARATOR_FUNC, 1, true);
+
+    final List<RexNode> compFuncs = Lists.newArrayListWithExpectedSize(inputs.size());
+
+    for (final RexNode input : inputs) {
+      compFuncs.add(rexBuilder.makeCall(op, ImmutableList.of(input)));
+    }
+
+    return RexUtil.composeDisjunction(rexBuilder, compFuncs, false);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
index 3edcdb2..60287dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerUtil.java
@@ -17,14 +17,24 @@
  */
 package org.apache.drill.exec.planner.sql.handlers;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.TypedSqlNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.tools.Planner;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.StarColumnHelper;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.sql.DirectPlan;
 import org.apache.drill.exec.planner.types.DrillFixedRelDataTypeImpl;
@@ -37,6 +47,7 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.drill.exec.store.ischema.Records;
 
+import java.util.AbstractList;
 import java.util.HashSet;
 import java.util.List;
 
@@ -123,6 +134,89 @@ public class SqlHandlerUtil {
     }
   }
 
+  /**
+   *  Resolve the partition columns specified in "PARTITION BY" clause of CTAS statement.
+   *
+   *  A partition column is resolved, either (1) the same column appear in the select list of CTAS
+   *  or (2) CTAS has a * in select list.
+   *
+   *  In the second case, a PROJECT with ITEM expression would be created and returned.
+   *  Throw validation error if a partition column is not resolved correctly.
+   *
+   * @param input : the RelNode represents the select statement in CTAS.
+   * @param partitionColumns : the list of partition columns.
+   * @return : 1) the original RelNode input, if all partition columns are in select list of CTAS
+   *           2) a New Project, if a partition column is resolved to * column in select list
+   *           3) validation error, if partition column is not resolved.
+   */
+  public static RelNode qualifyPartitionCol(RelNode input, List<String> partitionColumns) {
+
+    final RelDataType inputRowType = input.getRowType();
+
+    final List<RexNode> colRefStarExprs = Lists.newArrayList();
+    final List<String> colRefStarNames = Lists.newArrayList();
+    final RexBuilder builder = input.getCluster().getRexBuilder();
+    final int originalFieldSize = inputRowType.getFieldCount();
+
+    for (final String col : partitionColumns) {
+      final RelDataTypeField field = inputRowType.getField(col, false, false);
+
+      if (field == null) {
+        throw UserException.validationError()
+            .message("Partition column %s is not in the SELECT list of CTAS!", col)
+            .build();
+      } else {
+        if (field.getName().startsWith(StarColumnHelper.STAR_COLUMN)) {
+          colRefStarNames.add(col);
+
+          final List<RexNode> operands = Lists.newArrayList();
+          operands.add(new RexInputRef(field.getIndex(), field.getType()));
+          operands.add(builder.makeLiteral(col));
+          final RexNode item = builder.makeCall(SqlStdOperatorTable.ITEM, operands);
+          colRefStarExprs.add(item);
+        }
+      }
+    }
+
+    if (colRefStarExprs.isEmpty()) {
+      return input;
+    } else {
+      final List<String> names =
+          new AbstractList<String>() {
+            @Override
+            public String get(int index) {
+              if (index < originalFieldSize) {
+                return inputRowType.getFieldNames().get(index);
+              } else {
+                return colRefStarNames.get(index - originalFieldSize);
+              }
+            }
+
+            @Override
+            public int size() {
+              return originalFieldSize + colRefStarExprs.size();
+            }
+          };
+
+      final List<RexNode> refs =
+          new AbstractList<RexNode>() {
+            public int size() {
+              return originalFieldSize + colRefStarExprs.size();
+            }
+
+            public RexNode get(int index) {
+              if (index < originalFieldSize) {
+                return RexInputRef.of(index, inputRowType.getFieldList());
+              } else {
+                return colRefStarExprs.get(index - originalFieldSize);
+              }
+            }
+          };
+
+      return RelOptUtil.createProject(input, refs, names, false);
+    }
+  }
+
   public static Table getTableFromSchema(AbstractSchema drillSchema, String tblName) {
     try {
       return drillSchema.getTable(tblName);
@@ -133,4 +227,15 @@ public class SqlHandlerUtil {
               "in schema [%s]: %s", tblName, drillSchema.getFullSchemaName(), e.getMessage()), e);
     }
   }
+
+  public static void unparseSqlNodeList(SqlWriter writer, int leftPrec, int rightPrec, SqlNodeList fieldList) {
+    writer.keyword("(");
+    fieldList.get(0).unparse(writer, leftPrec, rightPrec);
+    for (int i = 1; i<fieldList.size(); i++) {
+      writer.keyword(",");
+      fieldList.get(i).unparse(writer, leftPrec, rightPrec);
+    }
+    writer.keyword(")");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
index bfa89a5..fbc5f9b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java
@@ -151,7 +151,7 @@ public class CompoundIdentifierConverter extends SqlShuttle {
   //SqlNode offset,
   //SqlNode fetch,
     rules.put(SqlSelect.class, R(D, E, D, E, E, E, D, E, D, D));
-    rules.put(SqlCreateTable.class, R(D, D, E));
+    rules.put(SqlCreateTable.class, R(D, D, D, E));
     rules.put(SqlCreateView.class, R(D, E, E, D));
     rules.put(SqlDescribeTable.class, R(D, D, E));
     rules.put(SqlDropView.class, R(D));

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
index 9fd9d92..5835b10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
@@ -19,8 +19,10 @@ package org.apache.drill.exec.planner.sql.parser;
 
 import java.util.List;
 
+import com.google.common.base.Preconditions;
 import org.apache.calcite.tools.Planner;
 
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.CreateTableHandler;
@@ -39,24 +41,28 @@ import org.apache.calcite.sql.parser.SqlParserPos;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerUtil;
 import org.apache.drill.exec.util.Pointer;
 
 public class SqlCreateTable extends DrillSqlCall {
   public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_TABLE", SqlKind.OTHER) {
     @Override
     public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
-      return new SqlCreateTable(pos, (SqlIdentifier) operands[0], (SqlNodeList) operands[1], operands[2]);
+      Preconditions.checkArgument(operands.length == 4, "SqlCreateTable.createCall() has to get 4 operands!");
+      return new SqlCreateTable(pos, (SqlIdentifier) operands[0], (SqlNodeList) operands[1], (SqlNodeList) operands[2], operands[3]);
     }
   };
 
-  private SqlIdentifier tblName;
-  private SqlNodeList fieldList;
-  private SqlNode query;
+  private final SqlIdentifier tblName;
+  private final SqlNodeList fieldList;
+  private final SqlNodeList partitionColumns;
+  private final SqlNode query;
 
-  public SqlCreateTable(SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList, SqlNode query) {
+  public SqlCreateTable(SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList, SqlNodeList partitionColumns, SqlNode query) {
     super(pos);
     this.tblName = tblName;
     this.fieldList = fieldList;
+    this.partitionColumns = partitionColumns;
     this.query = query;
   }
 
@@ -70,6 +76,7 @@ public class SqlCreateTable extends DrillSqlCall {
     List<SqlNode> ops = Lists.newArrayList();
     ops.add(tblName);
     ops.add(fieldList);
+    ops.add(partitionColumns);
     ops.add(query);
     return ops;
   }
@@ -79,14 +86,12 @@ public class SqlCreateTable extends DrillSqlCall {
     writer.keyword("CREATE");
     writer.keyword("TABLE");
     tblName.unparse(writer, leftPrec, rightPrec);
-    if (fieldList != null && fieldList.size() > 0) {
-      writer.keyword("(");
-      fieldList.get(0).unparse(writer, leftPrec, rightPrec);
-      for (int i=1; i<fieldList.size(); i++) {
-        writer.keyword(",");
-        fieldList.get(i).unparse(writer, leftPrec, rightPrec);
-      }
-      writer.keyword(")");
+    if (fieldList.size() > 0) {
+      SqlHandlerUtil.unparseSqlNodeList(writer, leftPrec, rightPrec, fieldList);
+    }
+    if (partitionColumns.size() > 0) {
+      writer.keyword("PARTITION BY");
+      SqlHandlerUtil.unparseSqlNodeList(writer, leftPrec, rightPrec, partitionColumns);
     }
     writer.keyword("AS");
     query.unparse(writer, leftPrec, rightPrec);
@@ -120,16 +125,21 @@ public class SqlCreateTable extends DrillSqlCall {
   }
 
   public List<String> getFieldNames() {
-    if (fieldList == null) {
-      return ImmutableList.of();
+    List<String> columnNames = Lists.newArrayList();
+    for(SqlNode node : fieldList.getList()) {
+      columnNames.add(node.toString());
     }
+    return columnNames;
+  }
 
+  public List<String> getPartitionColumns() {
     List<String> columnNames = Lists.newArrayList();
-    for(SqlNode node : fieldList.getList()) {
+    for(SqlNode node : partitionColumns.getList()) {
       columnNames.add(node.toString());
     }
     return columnNames;
   }
 
   public SqlNode getQuery() { return query; }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
index 57cfde9..a140829 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
 import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerUtil;
 import org.apache.drill.exec.planner.sql.handlers.ViewHandler;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlIdentifier;
@@ -86,14 +87,8 @@ public class SqlCreateView extends DrillSqlCall {
     }
     writer.keyword("VIEW");
     viewName.unparse(writer, leftPrec, rightPrec);
-    if (fieldList != null && fieldList.size() > 0) {
-      writer.keyword("(");
-      fieldList.get(0).unparse(writer, leftPrec, rightPrec);
-      for (int i=1; i<fieldList.size(); i++) {
-        writer.keyword(",");
-        fieldList.get(i).unparse(writer, leftPrec, rightPrec);
-      }
-      writer.keyword(")");
+    if (fieldList.size() > 0) {
+      SqlHandlerUtil.unparseSqlNodeList(writer, leftPrec, rightPrec, fieldList);
     }
     writer.keyword("AS");
     query.unparse(writer, leftPrec, rightPrec);
@@ -121,10 +116,6 @@ public class SqlCreateView extends DrillSqlCall {
   }
 
   public List<String> getFieldNames() {
-    if (fieldList == null) {
-      return ImmutableList.of();
-    }
-
     List<String> fieldNames = Lists.newArrayList();
     for (SqlNode node : fieldList.getList()) {
       fieldNames.add(node.toString());

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
index 6afce1a..524fe26 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
@@ -31,6 +31,7 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.dotdrill.View;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 
@@ -115,7 +116,13 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer
         .build();
   }
 
-  public CreateTableEntry createNewTable(String tableName) {
+  /**
+   *
+   * @param tableName : new table name.
+   * @param partitionColumns : list of partition columns. Empty list if there is no partition columns.
+   * @return
+   */
+  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
     throw UserException.unsupportedError()
         .message("Creating new tables is not supported in schema [%s]", getSchemaPath())
         .build();

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
index 4e50bc1..722638a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java
@@ -25,6 +25,7 @@ import org.apache.calcite.schema.Function;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.Table;
 
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 
 import com.google.common.collect.ImmutableList;
@@ -57,8 +58,8 @@ public class SubSchemaWrapper extends AbstractSchema {
   }
 
   @Override
-  public CreateTableEntry createNewTable(String tableName) {
-    return innerSchema.createNewTable(tableName);
+  public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
+    return innerSchema.createNewTable(tableName, partitionColumns);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
index fa9aa89..f9445e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -28,6 +28,7 @@ import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.store.AbstractSchema;
@@ -147,8 +148,8 @@ public class FileSystemSchemaFactory implements SchemaFactory{
     }
 
     @Override
-    public CreateTableEntry createNewTable(String tableName) {
-      return defaultSchema.createNewTable(tableName);
+    public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns) {
+      return defaultSchema.createNewTable(tableName, partitionColumns);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 5668c54..81f9f76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -42,7 +42,7 @@ public interface FormatPlugin {
 
   public FormatMatcher getMatcher();
 
-  public AbstractWriter getWriter(PhysicalOperator child, String location) throws IOException;
+  public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException;
 
   public Set<StoragePluginOptimizerRule> getOptimizerRules();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index b1135d0..8e0432a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -31,6 +31,7 @@ import org.apache.calcite.schema.Table;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.dotdrill.DotDrillFile;
 import org.apache.drill.exec.dotdrill.DotDrillType;
@@ -256,7 +257,7 @@ public class WorkspaceSchemaFactory {
     }
 
     @Override
-    public CreateTableEntry createNewTable(String tableName) {
+    public CreateTableEntry createNewTable(String tableName, List<String> partitonColumns) {
       String storage = schemaConfig.getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val;
       FormatPlugin formatPlugin = plugin.getFormatPlugin(storage);
       if (formatPlugin == null) {
@@ -268,7 +269,8 @@ public class WorkspaceSchemaFactory {
       return new FileSystemCreateTableEntry(
           (FileSystemConfig) plugin.getConfig(),
           formatPlugin,
-          config.getLocation() + Path.SEPARATOR + tableName);
+          config.getLocation() + Path.SEPARATOR + tableName,
+          partitonColumns);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 8a18639..2918ca7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -216,8 +216,8 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
   }
 
   @Override
-  public AbstractWriter getWriter(PhysicalOperator child, String location) throws IOException {
-    return new EasyWriter(child, location, this);
+  public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException {
+    return new EasyWriter(child, location, partitionColumns, this);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
index e12c5b3..db22568 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java
@@ -18,8 +18,10 @@
 package org.apache.drill.exec.store.dfs.easy;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractWriter;
@@ -38,12 +40,14 @@ public class EasyWriter extends AbstractWriter {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriter.class);
 
   private final String location;
+  private final List<String> partitionColumns;
   private final EasyFormatPlugin<?> formatPlugin;
 
   @JsonCreator
   public EasyWriter(
       @JsonProperty("child") PhysicalOperator child,
       @JsonProperty("location") String location,
+      @JsonProperty("partitionColumns") List<String> partitionColumns,
       @JsonProperty("storage") StoragePluginConfig storageConfig,
       @JsonProperty("format") FormatPluginConfig formatConfig,
       @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
@@ -52,15 +56,18 @@ public class EasyWriter extends AbstractWriter {
     this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
     this.location = location;
+    this.partitionColumns = partitionColumns;
   }
 
   public EasyWriter(PhysicalOperator child,
                          String location,
+                         List<String> partitionColumns,
                          EasyFormatPlugin<?> formatPlugin) {
 
     super(child);
     this.formatPlugin = formatPlugin;
     this.location = location;
+    this.partitionColumns = partitionColumns;
   }
 
   @JsonProperty("location")
@@ -85,7 +92,7 @@ public class EasyWriter extends AbstractWriter {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new EasyWriter(child, location, formatPlugin);
+    return new EasyWriter(child, location, partitionColumns, formatPlugin);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 322a88d..eff7872 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -119,8 +119,8 @@ public class ParquetFormatPlugin implements FormatPlugin{
   }
 
   @Override
-  public AbstractWriter getWriter(PhysicalOperator child, String location) throws IOException {
-    return new ParquetWriter(child, location, this);
+  public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException {
+    return new ParquetWriter(child, location, partitionColumns, this);
   }
 
   public RecordWriter getRecordWriter(FragmentContext context, ParquetWriter writer) throws IOException, OutOfMemoryException {

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
index 75f0e74..49c231e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java
@@ -18,8 +18,10 @@
 package org.apache.drill.exec.store.parquet;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.AbstractWriter;
@@ -39,12 +41,14 @@ public class ParquetWriter extends AbstractWriter {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetWriter.class);
 
   private final String location;
+  private final List<String> partitionColumns;
   private final ParquetFormatPlugin formatPlugin;
 
   @JsonCreator
   public ParquetWriter(
           @JsonProperty("child") PhysicalOperator child,
           @JsonProperty("location") String location,
+          @JsonProperty("partitionColumns") List<String> partitionColumns,
           @JsonProperty("storage") StoragePluginConfig storageConfig,
           @JacksonInject StoragePluginRegistry engineRegistry) throws IOException, ExecutionSetupException {
 
@@ -52,15 +56,18 @@ public class ParquetWriter extends AbstractWriter {
     this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, new ParquetFormatConfig());
     Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
     this.location = location;
+    this.partitionColumns = partitionColumns;
   }
 
   public ParquetWriter(PhysicalOperator child,
                        String location,
+                       List<String> partitionColumns,
                        ParquetFormatPlugin formatPlugin) {
 
     super(child);
     this.formatPlugin = formatPlugin;
     this.location = location;
+    this.partitionColumns = partitionColumns;
   }
 
   @JsonProperty("location")
@@ -73,6 +80,11 @@ public class ParquetWriter extends AbstractWriter {
     return formatPlugin.getStorageConfig();
   }
 
+  @JsonProperty("partitionColumns")
+  public List<String> getPartitionColumns() {
+    return partitionColumns;
+  }
+
   @JsonIgnore
   public FormatPluginConfig getFormatConfig(){
     return formatPlugin.getConfig();
@@ -85,7 +97,7 @@ public class ParquetWriter extends AbstractWriter {
 
   @Override
   protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
-    return new ParquetWriter(child, location, formatPlugin);
+    return new ParquetWriter(child, location, partitionColumns, formatPlugin);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/4c444663/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index f0422d3..6d03d81 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -1031,4 +1031,27 @@ public class TestExampleQueries extends BaseTestQuery {
         .baselineValues(1L, 1L, listOf(listOf(listOf("val1"), listOf("val2"))))
         .go();
   }
+
+  @Test
+  @Ignore
+  public void testPartitionCTAS() throws  Exception {
+    test("use dfs_test.tmp; " +
+        "create table mytable1  partition by (r_regionkey, r_comment) as select r_regionkey, r_name, r_comment from cp.`tpch/region.parquet`");
+
+    test("use dfs_test.tmp; " +
+        "create table mytable2  partition by (r_regionkey, r_comment) as select * from cp.`tpch/region.parquet` where r_name = 'abc' ");
+
+    test("use dfs_test.tmp; " +
+        "create table mytable3  partition by (r_regionkey, n_nationkey) as " +
+        "  select r.r_regionkey, r.r_name, n.n_nationkey, n.n_name from cp.`tpch/nation.parquet` n, cp.`tpch/region.parquet` r " +
+        "  where n.n_regionkey = r.r_regionkey");
+
+    test("use dfs_test.tmp; " +
+        "create table mytable4  partition by (r_regionkey, r_comment) as " +
+        "  select  r.* from cp.`tpch/nation.parquet` n, cp.`tpch/region.parquet` r " +
+        "  where n.n_regionkey = r.r_regionkey");
+
+
+  }
+
 }


Mime
View raw message