tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [4/7] tajo git commit: TAJO-1311: Enable Scattered Hash Shuffle for CTAS statement. (jaehwa)
Date Mon, 27 Apr 2015 14:17:42 GMT
TAJO-1311: Enable Scattered Hash Shuffle for CTAS statement. (jaehwa)

Closes #478


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b50831ff
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b50831ff
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b50831ff

Branch: refs/heads/index_support
Commit: b50831ffa66d94b67a419a95f81beed8fcab8ba2
Parents: 9acb3a6
Author: JaeHwa Jung <blrunner@apache.org>
Authored: Mon Apr 27 18:27:08 2015 +0900
Committer: JaeHwa Jung <blrunner@apache.org>
Committed: Mon Apr 27 18:27:08 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../engine/planner/PhysicalPlannerImpl.java     |   9 +
 .../engine/planner/global/GlobalPlanner.java    |  26 +-
 .../tajo/engine/query/TestTablePartitions.java  | 455 +++++++++++++------
 4 files changed, 347 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/b50831ff/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e3aec57..b6488c4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -22,6 +22,8 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1311: Enable Scattered Hash Shuffle for CTAS statement. (jaehwa)
+
     TAJO-1548: Refactoring condition code for CHAR into CatalogUtil.
     (Contributed by DaeMyung Kang, Committed by jaehwa)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b50831ff/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index ac1c9ad..f132793 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
@@ -854,6 +855,14 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
           sortSpecs[i++] = new SortSpec(insertNode.getProjectedSchema().getColumn(id), true,
false);
         }
       }
+    } else if (storeTableNode.getType() == NodeType.CREATE_TABLE) {
+      int i = 0;
+      for (int j = 0; j < partitionKeyColumns.length; j++) {
+        int id = storeTableNode.getOutSchema().getColumns().size() + j;
+        Column column = storeTableNode.getInSchema().getColumn(id);
+        sortSpecs[i++] = new SortSpec(column, true, false);
+      }
+
     } else {
       for (int i = 0; i < partitionKeyColumns.length; i++) {
         sortSpecs[i] = new SortSpec(partitionKeyColumns[i], true, false);

http://git-wip-us.apache.org/repos/asf/tajo/blob/b50831ff/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index cd35d96..54b920f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -1132,14 +1132,26 @@ public class GlobalPlanner {
     Preconditions.checkState(node.hasTargetTable(), "A target table must be a partitioned
table.");
     PartitionMethodDesc partitionMethod = node.getPartitionMethod();
 
-    if (node.getType() == NodeType.INSERT) {
-      InsertNode insertNode = (InsertNode) node;
-      channel.setSchema(((InsertNode)node).getProjectedSchema());
-      Column [] shuffleKeys = new Column[partitionMethod.getExpressionSchema().size()];
-      int i = 0;
+    if (node.getType() == NodeType.INSERT || node.getType() == NodeType.CREATE_TABLE) {
+      Schema tableSchema = null, projectedSchema = null;
+      if (node.getType() == NodeType.INSERT) {
+        tableSchema = ((InsertNode) node).getTableSchema();
+        projectedSchema = ((InsertNode) node).getProjectedSchema();
+      } else {
+        tableSchema = node.getOutSchema();
+        projectedSchema = node.getInSchema();
+      }
+      channel.setSchema(projectedSchema);
+
+      Column[] shuffleKeys = new Column[partitionMethod.getExpressionSchema().size()];
+      int i = 0, id = 0;
       for (Column column : partitionMethod.getExpressionSchema().getColumns()) {
-        int id = insertNode.getTableSchema().getColumnId(column.getQualifiedName());
-        shuffleKeys[i++] = insertNode.getProjectedSchema().getColumn(id);
+        if (node.getType() == NodeType.INSERT) {
+          id = tableSchema.getColumnId(column.getQualifiedName());
+        } else {
+          id = tableSchema.getColumns().size() + i;
+        }
+        shuffleKeys[i++] = projectedSchema.getColumn(id);
       }
       channel.setShuffleKeys(shuffleKeys);
       channel.setShuffleType(SCATTERED_HASH_SHUFFLE);

http://git-wip-us.apache.org/repos/asf/tajo/blob/b50831ff/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index bb15bfc..0d98b91 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -46,54 +46,68 @@ import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.worker.TajoWorker;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.sql.ResultSet;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.*;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.SCATTERED_HASH_SHUFFLE;
 import static org.junit.Assert.*;
 
+@RunWith(Parameterized.class)
 public class TestTablePartitions extends QueryTestCaseBase {
 
+  private NodeType nodeType;
 
-  public TestTablePartitions() throws IOException {
+  public TestTablePartitions(NodeType nodeType) throws IOException {
     super(TajoConstants.DEFAULT_DATABASE_NAME);
+    this.nodeType = nodeType;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+      //type
+      {NodeType.INSERT},
+      {NodeType.CREATE_TABLE},
+    });
   }
 
   @Test
   public final void testCreateColumnPartitionedTable() throws Exception {
+    ResultSet res = null;
     String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTable");
-    ResultSet res = executeString(
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
         "create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8)
");
-    res.close();
+      res.close();
 
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
-    assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
-    assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
+      assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
 
-    res = testBase.execute(
+      res = testBase.execute(
         "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " +
-            "l_quantity from lineitem");
+          "l_quantity from lineitem");
+    } else {
+      res = testBase.execute(
+        "create table " + tableName + "(col1 int4, col2 int4) partition by column(key float8)
"
+          + " as select l_orderkey, l_partkey, l_quantity from lineitem");
+    }
 
     MasterPlan plan = getQueryPlan(res);
     ExecutionBlock rootEB = plan.getRoot();
 
-    /*
-    -------------------------------------------------------------------------------
-    |-eb_1405354886454_0001_000003
-       |-eb_1405354886454_0001_000002
-          |-eb_1405354886454_0001_000001
-     */
     assertEquals(1, plan.getChildCount(rootEB.getId()));
 
     ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0);
     assertNotNull(insertEB);
-    assertEquals(NodeType.INSERT, insertEB.getPlan().getType());
+
+    assertEquals(nodeType, insertEB.getPlan().getType());
     assertEquals(1, plan.getChildCount(insertEB.getId()));
 
     ExecutionBlock scanEB = plan.getChild(insertEB.getId(), 0);
@@ -105,40 +119,41 @@ public class TestTablePartitions extends QueryTestCaseBase {
     assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType());
     assertEquals(1, channel.getShuffleKeys().length);
 
+    executeString("DROP TABLE " + tableName + " PURGE").close();
     res.close();
   }
 
   @Test
   public final void testCreateColumnPartitionedTableWithJoin() throws Exception {
+    ResultSet res = null;
     String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTableWithJoin");
-    ResultSet res = executeString(
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
         "create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8)
");
-    res.close();
+      res.close();
 
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
-    assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
-    assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
+      assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
 
-    res = testBase.execute(
+      res = testBase.execute(
         "insert overwrite into " + tableName + " select l_orderkey, l_partkey, " +
-            "l_quantity from lineitem join orders on l_orderkey = o_orderkey");
+          "l_quantity from lineitem join orders on l_orderkey = o_orderkey");
+
+    } else {
+      res = testBase.execute("create table " + tableName + " (col1 int4, col2 int4) partition
by column(key float8) "
+        + " AS select l_orderkey, l_partkey, l_quantity from lineitem join orders on l_orderkey
= o_orderkey");
+    }
 
     MasterPlan plan = getQueryPlan(res);
     ExecutionBlock rootEB = plan.getRoot();
 
-    /*
-    -------------------------------------------------------------------------------
-    |-eb_1405356074433_0001_000005
-       |-eb_1405356074433_0001_000004
-          |-eb_1405356074433_0001_000003
-             |-eb_1405356074433_0001_000002
-             |-eb_1405356074433_0001_000001
-     */
     assertEquals(1, plan.getChildCount(rootEB.getId()));
 
     ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0);
     assertNotNull(insertEB);
-    assertEquals(NodeType.INSERT, insertEB.getPlan().getType());
+    assertEquals(nodeType, insertEB.getPlan().getType());
     assertEquals(1, plan.getChildCount(insertEB.getId()));
 
     ExecutionBlock scanEB = plan.getChild(insertEB.getId(), 0);
@@ -150,38 +165,56 @@ public class TestTablePartitions extends QueryTestCaseBase {
     assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType());
     assertEquals(1, channel.getShuffleKeys().length);
 
+    executeString("DROP TABLE " + tableName + " PURGE").close();
     res.close();
   }
 
   @Test
   public final void testCreateColumnPartitionedTableWithSelectedColumns() throws Exception
{
+    ResultSet res = null;
     String tableName = CatalogUtil.normalizeIdentifier("testCreateColumnPartitionedTableWithSelectedColumns");
-    ResultSet res = executeString(
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
         "create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by
column(key float8) ");
-    res.close();
+      res.close();
 
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
-    assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
-    assertEquals(4, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size());
+      assertEquals(4, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size());
 
-    res = executeString("insert overwrite into " + tableName + " (col1, col2, key) select
l_orderkey, " +
+      res = executeString("insert overwrite into " + tableName + " (col1, col2, key) select
l_orderkey, " +
         "l_partkey, l_quantity from lineitem");
+    } else {
+      res = executeString("create table " + tableName + " (col1 int4, col2 int4, null_col
int4)"
+        + " partition by column(key float8) AS select l_orderkey, l_partkey, null, l_quantity
from lineitem");
+    }
     res.close();
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
   @Test
   public final void testColumnPartitionedTableByOneColumn() throws Exception {
+    ResultSet res = null;
     String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByOneColumn");
-    ResultSet res = executeString(
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
         "create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by
column(key float8) ");
-    res.close();
+      res.close();
 
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
 
-    res = executeString("insert overwrite into " + tableName
+      res = executeString("insert overwrite into " + tableName
         + " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem");
+    } else {
+      res = executeString("create table " + tableName + " (col1 int4, col2 int4, null_col
int4) "
+        + " partition by column(key float8) as select l_orderkey, l_partkey, null, l_quantity
from lineitem");
+    }
     res.close();
 
+
     TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
     assertPartitionDirectories(desc);
 
@@ -197,6 +230,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
       assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(1));
       assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(2));
     }
+    executeString("DROP TABLE " + tableName + " PURGE").close();
     res.close();
   }
 
@@ -216,16 +250,23 @@ public class TestTablePartitions extends QueryTestCaseBase {
 
   @Test
   public final void testQueryCasesOnColumnPartitionedTable() throws Exception {
+    ResultSet res = null;
     String tableName = CatalogUtil.normalizeIdentifier("testQueryCasesOnColumnPartitionedTable");
-    ResultSet res = executeString(
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
         "create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by
column(key float8) ");
-    res.close();
+      res.close();
 
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
 
-    res = executeString(
+      res = executeString(
         "insert overwrite into " + tableName
-            + " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem");
+          + " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem");
+    } else {
+      res = executeString("create table " + tableName + " (col1 int4, col2 int4, null_col
int4) "
+        + " partition by column(key float8) as select l_orderkey, l_partkey, null, l_quantity
from lineitem");
+    }
     res.close();
 
     TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
@@ -292,20 +333,31 @@ public class TestTablePartitions extends QueryTestCaseBase {
     res = executeFile("case13.sql");
     assertResultSet(res, "case13.result");
     res.close();
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+    res.close();
   }
 
   @Test
   public final void testColumnPartitionedTableByThreeColumns() throws Exception {
+    ResultSet res = null;
     String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByThreeColumns");
-    ResultSet res = testBase.execute(
+
+    if (nodeType == NodeType.INSERT) {
+      res = testBase.execute(
         "create table " + tableName + " (col4 text) partition by column(col1 int4, col2 int4,
col3 float8) ");
-    res.close();
-    TajoTestingCluster cluster = testBase.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      res.close();
+      TajoTestingCluster cluster = testBase.getTestingCluster();
+      CatalogService catalog = cluster.getMaster().getCatalog();
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
 
-    res = executeString("insert overwrite into " + tableName
+      res = executeString("insert overwrite into " + tableName
         + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+    } else {
+      res = executeString( "create table " + tableName + " (col4 text) "
+        + " partition by column(col1 int4, col2 int4, col3 float8) as select l_returnflag,
l_orderkey, l_partkey, " +
+        "l_quantity from lineitem");
+    }
     res.close();
 
     TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
@@ -342,7 +394,6 @@ public class TestTablePartitions extends QueryTestCaseBase {
     }
     res.close();
 
-
     Map<Double, int []> resultRows2 = Maps.newHashMap();
     resultRows2.put(49.0d, new int[]{3, 3});
     resultRows2.put(45.0d, new int[]{3, 2});
@@ -355,21 +406,31 @@ public class TestTablePartitions extends QueryTestCaseBase {
       assertEquals(resultRows2.get(res.getDouble(4))[0], res.getInt(2));
       assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3));
     }
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
     res.close();
   }
 
   @Test
   public final void testInsertIntoColumnPartitionedTableByThreeColumns() throws Exception
{
+    ResultSet res = null;
     String tableName = CatalogUtil.normalizeIdentifier("testInsertIntoColumnPartitionedTableByThreeColumns");
-    ResultSet res = testBase.execute(
+
+    if (nodeType == NodeType.INSERT) {
+      res = testBase.execute(
         "create table " + tableName + " (col4 text) partition by column(col1 int4, col2 int4,
col3 float8) ");
-    res.close();
-    TajoTestingCluster cluster = testBase.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      res.close();
+      TajoTestingCluster cluster = testBase.getTestingCluster();
+      CatalogService catalog = cluster.getMaster().getCatalog();
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
 
-    res = executeString("insert into " + tableName
+      res = executeString("insert into " + tableName
         + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+    } else {
+      res = executeString( "create table " + tableName + " (col4 text) "
+        + " partition by column(col1 int4, col2 int4, col3 float8) as select l_returnflag,
l_orderkey, l_partkey, " +
+        "l_quantity from lineitem");
+    }
     res.close();
 
     TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
@@ -405,7 +466,6 @@ public class TestTablePartitions extends QueryTestCaseBase {
     }
     res.close();
 
-
     Map<Double, int []> resultRows2 = Maps.newHashMap();
     resultRows2.put(49.0d, new int[]{3, 3});
     resultRows2.put(45.0d, new int[]{3, 2});
@@ -534,21 +594,33 @@ public class TestTablePartitions extends QueryTestCaseBase {
     assertEquals(summary.getDirectoryCount(), 1L);
     assertEquals(summary.getFileCount(), 0L);
     assertEquals(summary.getLength(), 0L);
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
   @Test
   public final void testColumnPartitionedTableByOneColumnsWithCompression() throws Exception
{
+    ResultSet res = null;
     String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByOneColumnsWithCompression");
-    ResultSet res = executeString(
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
         "create table " + tableName + " (col2 int4, col3 float8) USING csv " +
-            "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
" +
-            "PARTITION BY column(col1 int4)");
-    res.close();
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+          "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
" +
+          "PARTITION BY column(col1 int4)");
+      res.close();
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
 
-    res = executeString(
+      res = executeString(
         "insert overwrite into " + tableName + " select l_partkey, l_quantity, l_orderkey
from lineitem");
+    } else {
+      res = executeString(
+        "create table " + tableName + " (col2 int4, col3 float8) USING csv " +
+          "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
" +
+          "PARTITION BY column(col1 int4) as select l_partkey, l_quantity, l_orderkey from
lineitem");
+    }
     res.close();
+
     TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
     if (!testingCluster.isHiveCatalogStoreRunning()) {
       assertEquals(5, desc.getStats().getNumRows().intValue());
@@ -570,22 +642,34 @@ public class TestTablePartitions extends QueryTestCaseBase {
         assertTrue(codec instanceof DeflateCodec);
       }
     }
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
   @Test
   public final void testColumnPartitionedTableByTwoColumnsWithCompression() throws Exception
{
+    ResultSet res = null;
     String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByTwoColumnsWithCompression");
-    ResultSet res = executeString("create table " + tableName + " (col3 float8, col4 text)
USING csv " +
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString("create table " + tableName + " (col3 float8, col4 text) USING
csv " +
         "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
" +
         "PARTITION by column(col1 int4, col2 int4)");
-    res.close();
+      res.close();
 
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
 
-    res = executeString(
+      res = executeString(
         "insert overwrite into " + tableName +
-            " select  l_quantity, l_returnflag, l_orderkey, l_partkey from lineitem");
+          " select  l_quantity, l_returnflag, l_orderkey, l_partkey from lineitem");
+    } else {
+      res = executeString("create table " + tableName + " (col3 float8, col4 text) USING
csv " +
+          "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
" +
+          "PARTITION by column(col1 int4, col2 int4) as select  l_quantity, l_returnflag,
l_orderkey, " +
+        "l_partkey from lineitem");
+    }
     res.close();
+
     TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
     if (!testingCluster.isHiveCatalogStoreRunning()) {
       assertEquals(5, desc.getStats().getNumRows().intValue());
@@ -614,23 +698,35 @@ public class TestTablePartitions extends QueryTestCaseBase {
         }
       }
     }
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
   @Test
   public final void testColumnPartitionedTableByThreeColumnsWithCompression() throws Exception
{
+    ResultSet res = null;
     String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableByThreeColumnsWithCompression");
-    ResultSet res = executeString(
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
         "create table " + tableName + " (col4 text) USING csv " +
-            "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
" +
-            "partition by column(col1 int4, col2 int4, col3 float8)");
-    res.close();
+          "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
" +
+          "partition by column(col1 int4, col2 int4, col3 float8)");
+      res.close();
 
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
 
-    res = executeString(
+      res = executeString(
         "insert overwrite into " + tableName +
-            " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+          " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+    } else {
+      res = executeString("create table " + tableName + " (col4 text) USING csv " +
+          "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
" +
+          "partition by column(col1 int4, col2 int4, col3 float8) as select l_returnflag,
l_orderkey, l_partkey, " +
+        "l_quantity from lineitem");
+    }
     res.close();
+
     TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
     if (!testingCluster.isHiveCatalogStoreRunning()) {
       assertEquals(5, desc.getStats().getNumRows().intValue());
@@ -697,23 +793,35 @@ public class TestTablePartitions extends QueryTestCaseBase {
 
     res.close();
     assertEquals(3, i);
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
   @Test
   public final void testColumnPartitionedTableNoMatchedPartition() throws Exception {
+    ResultSet res = null;
     String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableNoMatchedPartition");
-    ResultSet res = executeString(
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
         "create table " + tableName + " (col4 text) USING csv " +
-            "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
" +
-            "partition by column(col1 int4, col2 int4, col3 float8)");
-    res.close();
+          "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
" +
+          "partition by column(col1 int4, col2 int4, col3 float8)");
+      res.close();
 
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
 
-    res = executeString(
+      res = executeString(
         "insert overwrite into " + tableName +
-            " select l_returnflag , l_orderkey, l_partkey, l_quantity from lineitem");
+          " select l_returnflag , l_orderkey, l_partkey, l_quantity from lineitem");
+    } else {
+      res = executeString("create table " + tableName + " (col4 text) USING csv " +
+          "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec')
" +
+          "partition by column(col1 int4, col2 int4, col3 float8) as select l_returnflag
, l_orderkey, l_partkey, " +
+        "l_quantity from lineitem");
+    }
     res.close();
+
     TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
     if (!testingCluster.isHiveCatalogStoreRunning()) {
       assertEquals(5, desc.getStats().getNumRows().intValue());
@@ -753,12 +861,15 @@ public class TestTablePartitions extends QueryTestCaseBase {
     res = executeString("select * from " + tableName + " where col2 = 9");
     assertFalse(res.next());
     res.close();
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
   @Test
   public final void testColumnPartitionedTableWithSmallerExpressions1() throws Exception
{
+    ResultSet res = null;
     String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions1");
-    ResultSet res = executeString(
+    res = executeString(
         "create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by
column(key float8) ");
     res.close();
 
@@ -773,26 +884,35 @@ public class TestTablePartitions extends QueryTestCaseBase {
     res = executeFile("case14.sql");
     assertResultSet(res, "case14.result");
     res.close();
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
   @Test
   public final void testColumnPartitionedTableWithSmallerExpressions2() throws Exception
{
+    ResultSet res = null;
+    ClientProtos.SubmitQueryResponse response = null;
     String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions2");
-    ResultSet res = executeString(
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
         "create table " + tableName + " (col1 int4, col2 int4, null_col int4) partition by
column(key float8) ");
-    res.close();
+      res.close();
 
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
 
-    ClientProtos.SubmitQueryResponse response = client.executeQuery("insert overwrite into
" + tableName
+      response = client.executeQuery("insert overwrite into " + tableName
         + " select l_returnflag , l_orderkey, l_partkey from lineitem");
 
-    assertTrue(response.hasErrorMessage());
-    assertEquals(response.getErrorMessage(), "INSERT has smaller expressions than target
columns\n");
+      assertTrue(response.hasErrorMessage());
+      assertEquals(response.getErrorMessage(), "INSERT has smaller expressions than target
columns\n");
 
-    res = executeFile("case15.sql");
-    assertResultSet(res, "case15.result");
-    res.close();
+      res = executeFile("case15.sql");
+      assertResultSet(res, "case15.result");
+      res.close();
+
+      executeString("DROP TABLE " + tableName + " PURGE").close();
+    }
   }
 
 
@@ -803,68 +923,104 @@ public class TestTablePartitions extends QueryTestCaseBase {
     res = executeString("create database testinsertquery2;");
     res.close();
 
-    res = executeString("create table testinsertquery1.table1 " +
+    if (nodeType == NodeType.INSERT) {
+      res = executeString("create table testinsertquery1.table1 " +
         "(col1 int4, col2 int4, col3 float8)");
-    res.close();
+      res.close();
 
-    res = executeString("create table testinsertquery2.table1 " +
+      res = executeString("create table testinsertquery2.table1 " +
         "(col1 int4, col2 int4, col3 float8)");
-    res.close();
+      res.close();
 
-    CatalogService catalog = testingCluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable("testinsertquery1", "table1"));
-    assertTrue(catalog.existsTable("testinsertquery2", "table1"));
+      CatalogService catalog = testingCluster.getMaster().getCatalog();
+      assertTrue(catalog.existsTable("testinsertquery1", "table1"));
+      assertTrue(catalog.existsTable("testinsertquery2", "table1"));
 
-    res = executeString("insert overwrite into testinsertquery1.table1 " +
+      res = executeString("insert overwrite into testinsertquery1.table1 " +
         "select l_orderkey, l_partkey, l_quantity from default.lineitem;");
-    res.close();
+      res.close();
+    } else {
+      res = executeString("create table testinsertquery1.table1 " +
+        "(col1 int4, col2 int4, col3 float8) as select l_orderkey, l_partkey, l_quantity
from default.lineitem;");
+      res.close();
+    }
 
     TableDesc desc = catalog.getTableDesc("testinsertquery1", "table1");
     if (!testingCluster.isHiveCatalogStoreRunning()) {
       assertEquals(5, desc.getStats().getNumRows().intValue());
     }
 
-    res = executeString("insert overwrite into testinsertquery2.table1 " +
+    if (nodeType == NodeType.INSERT) {
+      res = executeString("insert overwrite into testinsertquery2.table1 " +
         "select col1, col2, col3 from testinsertquery1.table1;");
-    res.close();
-
+      res.close();
+    } else {
+      res = executeString("create table testinsertquery2.table1 " +
+        "(col1 int4, col2 int4, col3 float8) as select col1, col2, col3 from testinsertquery1.table1;");
+      res.close();
+    }
     desc = catalog.getTableDesc("testinsertquery2", "table1");
     if (!testingCluster.isHiveCatalogStoreRunning()) {
       assertEquals(5, desc.getStats().getNumRows().intValue());
     }
+
+    executeString("DROP TABLE testinsertquery1.table1 PURGE").close();
+    executeString("DROP TABLE testinsertquery2.table1 PURGE").close();
+    executeString("DROP DATABASE testinsertquery1").close();
+    executeString("DROP DATABASE testinsertquery2").close();
   }
 
   @Test
   public final void testColumnPartitionedTableWithSmallerExpressions5() throws Exception
{
+    ResultSet res = null;
     String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions5");
-    ResultSet res = executeString(
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
         "create table " + tableName + " (col1 text) partition by column(col2 text) ");
-    res.close();
+      res.close();
 
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
 
-    res = executeString("insert overwrite into " + tableName + "(col1) select l_returnflag
from lineitem");
+      res = executeString("insert overwrite into " + tableName + "(col1) select l_returnflag
from lineitem");
+
+    } else {
+      res = executeString("create table " + tableName + " (col1 text) partition by column(col2
text) " +
+        " as select l_returnflag, null from lineitem");
+    }
     res.close();
     res = executeString("select * from " + tableName);
     assertResultSet(res);
     res.close();
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
   @Test
   public final void testColumnPartitionedTableWithSmallerExpressions6() throws Exception
{
+    ResultSet res = null;
     String tableName = CatalogUtil.normalizeIdentifier("testColumnPartitionedTableWithSmallerExpressions6");
-    ResultSet res = executeString(
+
+    if (nodeType == NodeType.INSERT) {
+      res = executeString(
         "create table " + tableName + " (col1 text) partition by column(col2 text) ");
-    res.close();
+      res.close();
 
-    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
+      assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
 
-    res = executeString(
+      res = executeString(
         "insert overwrite into " + tableName + "(col1) select l_returnflag from lineitem
where l_orderkey = 1");
+    } else {
+      res = executeString( "create table " + tableName + " (col1 text) partition by column(col2
text) " +
+        " as select l_returnflag, null from lineitem where l_orderkey = 1");
+    }
     res.close();
+
     res = executeString("select * from " + tableName);
     assertResultSet(res);
     res.close();
+
+    executeString("DROP TABLE " + tableName + " PURGE").close();
   }
 
   private MasterPlan getQueryPlan(ResultSet res) {
@@ -924,8 +1080,13 @@ public class TestTablePartitions extends QueryTestCaseBase {
       CatalogService catalog = testingCluster.getMaster().getCatalog();
       assertTrue(catalog.existsTable("default", "testscatteredhashshuffle"));
 
-      executeString("create table test_partition (col2 text) partition by column (col1 text)").close();
-      executeString("insert into test_partition select col2, col1 from testscatteredhashshuffle").close();
+      if (nodeType == NodeType.INSERT) {
+        executeString("create table test_partition (col2 text) partition by column (col1
text)").close();
+        executeString("insert into test_partition select col2, col1 from testscatteredhashshuffle").close();
+      } else {
+        executeString("create table test_partition (col2 text) PARTITION BY COLUMN (col1
text) AS select col2, " +
+          "col1 from testscatteredhashshuffle").close();
+      }
 
       ResultSet res = executeString("select col1 from test_partition");
 
@@ -935,8 +1096,6 @@ public class TestTablePartitions extends QueryTestCaseBase {
       }
       assertEquals(data.size(), numRows);
 
-      // assert data file size
-
     } finally {
       testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.varname,
           TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.defaultVal);
@@ -953,10 +1112,17 @@ public class TestTablePartitions extends QueryTestCaseBase {
 
     executeDDL("lineitemspecial_ddl.sql", "lineitemspecial.tbl");
 
-    executeString("CREATE TABLE IF NOT EXISTS pTable947 (id int, name text) PARTITION BY
COLUMN (type text)")
+    if (nodeType == NodeType.INSERT) {
+      executeString("CREATE TABLE IF NOT EXISTS pTable947 (id int, name text) PARTITION BY
COLUMN (type text)")
         .close();
-    executeString("INSERT OVERWRITE INTO pTable947 SELECT l_orderkey, l_shipinstruct, l_shipmode
FROM lineitemspecial")
+      executeString("INSERT OVERWRITE INTO pTable947 SELECT l_orderkey, l_shipinstruct, l_shipmode
FROM lineitemspecial")
         .close();
+    } else {
+      executeString("CREATE TABLE IF NOT EXISTS pTable947 (id int, name text) PARTITION BY
COLUMN (type text)" +
+        " AS  SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial")
+        .close();
+    }
+
     ResultSet res = executeString("select * from pTable947 where type='RA:*?><I/L#%S'
or type='AIR'");
 
     String resStr = resultSetToString(res);
@@ -968,6 +1134,8 @@ public class TestTablePartitions extends QueryTestCaseBase {
 
     assertEquals(expected, resStr);
     cleanupQuery(res);
+
+    executeString("DROP TABLE pTable947 PURGE").close();
   }
 
   @Test
@@ -976,10 +1144,16 @@ public class TestTablePartitions extends QueryTestCaseBase {
 
     executeDDL("lineitemspecial_ddl.sql", "lineitemspecial.tbl");
 
-    executeString("CREATE TABLE IF NOT EXISTS pTable948 (id int, name text) PARTITION BY
COLUMN (type text)")
+    if (nodeType == NodeType.INSERT) {
+      executeString("CREATE TABLE IF NOT EXISTS pTable948 (id int, name text) PARTITION BY
COLUMN (type text)")
         .close();
-    executeString("INSERT OVERWRITE INTO pTable948 SELECT l_orderkey, l_shipinstruct, l_shipmode
FROM lineitemspecial")
+      executeString("INSERT OVERWRITE INTO pTable948 SELECT l_orderkey, l_shipinstruct, l_shipmode
FROM lineitemspecial")
         .close();
+    } else {
+      executeString("CREATE TABLE IF NOT EXISTS pTable948 (id int, name text) PARTITION BY
COLUMN (type text)" +
+        " AS SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial")
+        .close();
+    }
 
     ResultSet res = executeString("select * from pTable948 where type='RA:*?><I/L#%S'");
     assertResultSet(res);
@@ -988,6 +1162,8 @@ public class TestTablePartitions extends QueryTestCaseBase {
     res = executeString("select * from pTable948 where type='RA:*?><I/L#%S' or type='AIR01'");
     assertResultSet(res);
     cleanupQuery(res);
+
+    executeString("DROP TABLE pTable948 PURGE").close();
   }
 
   @Test
@@ -995,19 +1171,22 @@ public class TestTablePartitions extends QueryTestCaseBase {
     // See - TAJO-1219: Files located in intermediate directories of partitioned table should
be ignored
     // It verifies that Tajo ignores files located in intermediate directories of partitioned
table.
 
-    Path testDir = CommonTestingUtil.getTestDir();
+    if (nodeType == NodeType.INSERT) {
+      Path testDir = CommonTestingUtil.getTestDir();
 
-    executeString(
-      "CREATE EXTERNAL TABLE testIgnoreFilesInIntermediateDir (col1 int) USING CSV PARTITION
BY COLUMN (col2 text) " +
-        "LOCATION '" + testDir + "'");
+      executeString(
+        "CREATE EXTERNAL TABLE testIgnoreFilesInIntermediateDir (col1 int) USING CSV PARTITION
BY COLUMN (col2 text) " +
+          "LOCATION '" + testDir + "'");
 
-    FileSystem fs = testDir.getFileSystem(conf);
-    FSDataOutputStream fos = fs.create(new Path(testDir, "table1.data"));
-    fos.write("a|b|c".getBytes());
-    fos.close();
+      FileSystem fs = testDir.getFileSystem(conf);
+      FSDataOutputStream fos = fs.create(new Path(testDir, "table1.data"));
+      fos.write("a|b|c".getBytes());
+      fos.close();
 
-    ResultSet res = executeString("select * from testIgnoreFilesInIntermediateDir;");
-    assertFalse(res.next());
-    res.close();
+      ResultSet res = executeString("select * from testIgnoreFilesInIntermediateDir;");
+      assertFalse(res.next());
+      res.close();
+    }
   }
+
 }


Mime
View raw message