tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From blrun...@apache.org
Subject tajo git commit: TAJO-1644: When inserting empty data into a partitioned table, existing data would be removed. (jaehwa)
Date Fri, 26 Jun 2015 03:01:47 GMT
Repository: tajo
Updated Branches:
  refs/heads/branch-0.10.2 e67d24dae -> 57faba4f2


TAJO-1644: When inserting empty data into a partitioned table, existing data would be removed.
(jaehwa)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/57faba4f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/57faba4f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/57faba4f

Branch: refs/heads/branch-0.10.2
Commit: 57faba4f2566d2b1a728967da27acf183ba8c28e
Parents: e67d24d
Author: JaeHwa Jung <blrunner@apache.org>
Authored: Fri Jun 26 12:00:42 2015 +0900
Committer: JaeHwa Jung <blrunner@apache.org>
Committed: Fri Jun 26 12:00:42 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../main/java/org/apache/tajo/SessionVars.java  |   4 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   4 +
 .../tajo/engine/query/TestTablePartitions.java  | 163 +++++++++----------
 .../TestTajoCli/testHelpSessionVars.result      |   3 +-
 .../org/apache/tajo/storage/StorageManager.java |   7 +-
 6 files changed, 95 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/57faba4f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0046ad4..32e26ce 100644
--- a/CHANGES
+++ b/CHANGES
@@ -11,6 +11,9 @@ Release 0.10.2 - Released
 
   BUG FIXES
 
+    TAJO-1644: When inserting empty data into a partitioned table,
+    existing data would be removed. (jaehwa)
+
     TAJO-1642: CatalogServer need to check meta table first. (jaehwa)
 
   TASKS

http://git-wip-us.apache.org/repos/asf/tajo/blob/57faba4f/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index 4882b27..a74775a 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -123,6 +123,10 @@ public enum SessionVars implements ConfigKey {
   NULL_CHAR(ConfVars.$TEXT_NULL, "null char of text file output", DEFAULT),
   CODEGEN(ConfVars.$CODEGEN, "Runtime code generation enabled (experiment)", DEFAULT),
 
+  PARTITION_NO_RESULT_OVERWRITE_ENABLED(ConfVars.$PARTITION_NO_RESULT_OVERWRITE_ENABLED,
+    "If True, a partitioned table is overwritten even if a sub query leads to no result.
"
+    + "Otherwise, the table data will be kept if there is no result", DEFAULT),
+
   // Behavior Control ---------------------------------------------------------
   ARITHABORT(ConfVars.$BEHAVIOR_ARITHMETIC_ABORT,
       "If true, a running query will be terminated when an overflow or divide-by-zero occurs.",
DEFAULT),

http://git-wip-us.apache.org/repos/asf/tajo/blob/57faba4f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 95ef4bc..84f2872 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -359,6 +359,10 @@ public class TajoConf extends Configuration {
     // Behavior Control ---------------------------------------------------------
     $BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false),
 
+    // If True, a partitioned table is overwritten even if a sub query leads to no result.
+    // Otherwise, the table data will be kept if there is no result
+    $PARTITION_NO_RESULT_OVERWRITE_ENABLED("tajo.partition.overwrite.even-if-no-result",
false),
+
     // ResultSet ---------------------------------------------------------
     $RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200),
     $RESULT_SET_BLOCK_WAIT("tajo.resultset.block.wait", true),

http://git-wip-us.apache.org/repos/asf/tajo/blob/57faba4f/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index b48720a..58e0b78 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -360,34 +360,26 @@ public class TestTablePartitions extends QueryTestCaseBase {
 
   @Test
   public final void testInsertIntoColumnPartitionedTableByThreeColumns() throws Exception
{
+    ResultSet res = null;
     String tableName = CatalogUtil.normalizeIdentifier("testInsertIntoColumnPartitionedTableByThreeColumns");
-    ResultSet res = testBase.execute(
-        "create table " + tableName + " (col4 text) partition by column(col1 int4, col2 int4,
col3 float8) ");
+
+    res = testBase.execute(
+      "create table " + tableName + " (col4 text) partition by column(col1 int4, col2 int4,
col3 float8) ");
     res.close();
     TajoTestingCluster cluster = testBase.getTestingCluster();
     CatalogService catalog = cluster.getMaster().getCatalog();
     assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName));
 
     res = executeString("insert into " + tableName
-        + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+      + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+
     res.close();
 
     TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
     Path path = new Path(desc.getPath());
 
     FileSystem fs = FileSystem.get(conf);
-    assertTrue(fs.isDirectory(path));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
+    verifyDirectoriesForThreeColumns(fs, path, 1);
     if (!testingCluster.isHCatalogStoreRunning()) {
       assertEquals(5, desc.getStats().getNumRows().intValue());
     }
@@ -405,7 +397,6 @@ public class TestTablePartitions extends QueryTestCaseBase {
     }
     res.close();
 
-
     Map<Double, int []> resultRows2 = Maps.newHashMap();
     resultRows2.put(49.0d, new int[]{3, 3});
     resultRows2.put(45.0d, new int[]{3, 2});
@@ -422,38 +413,27 @@ public class TestTablePartitions extends QueryTestCaseBase {
 
     // insert into already exists partitioned table
     res = executeString("insert into " + tableName
-        + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+      + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
     res.close();
 
     desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
     path = new Path(desc.getPath());
 
-    assertTrue(fs.isDirectory(path));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
-
+    verifyDirectoriesForThreeColumns(fs, path, 2);
     if (!testingCluster.isHCatalogStoreRunning()) {
       assertEquals(5, desc.getStats().getNumRows().intValue());
     }
+
     String expected = "N\n" +
-        "N\n" +
-        "N\n" +
-        "N\n" +
-        "N\n" +
-        "N\n" +
-        "R\n" +
-        "R\n" +
-        "R\n" +
-        "R\n";
+      "N\n" +
+      "N\n" +
+      "N\n" +
+      "N\n" +
+      "N\n" +
+      "R\n" +
+      "R\n" +
+      "R\n" +
+      "R\n";
 
     String tableData = getTableFileContents(new Path(desc.getPath()));
     assertEquals(expected, tableData);
@@ -462,78 +442,87 @@ public class TestTablePartitions extends QueryTestCaseBase {
     String resultSetData = resultSetToString(res);
     res.close();
     expected = "col4,col1,col2,col3\n" +
-        "-------------------------------\n" +
-        "N,2,2,38.0\n" +
-        "N,2,2,38.0\n" +
-        "R,3,2,45.0\n" +
-        "R,3,2,45.0\n";
+      "-------------------------------\n" +
+      "N,2,2,38.0\n" +
+      "N,2,2,38.0\n" +
+      "R,3,2,45.0\n" +
+      "R,3,2,45.0\n";
     assertEquals(expected, resultSetData);
 
     res = executeString("select * from " + tableName + " where (col1 = 2 or col1 = 3) and
col2 >= 2");
     resultSetData = resultSetToString(res);
     res.close();
     expected = "col4,col1,col2,col3\n" +
-        "-------------------------------\n" +
-        "N,2,2,38.0\n" +
-        "N,2,2,38.0\n" +
-        "R,3,2,45.0\n" +
-        "R,3,2,45.0\n" +
-        "R,3,3,49.0\n" +
-        "R,3,3,49.0\n";
+      "-------------------------------\n" +
+      "N,2,2,38.0\n" +
+      "N,2,2,38.0\n" +
+      "R,3,2,45.0\n" +
+      "R,3,2,45.0\n" +
+      "R,3,3,49.0\n" +
+      "R,3,3,49.0\n";
     assertEquals(expected, resultSetData);
 
     // Check not to remove existing partition directories.
     res = executeString("insert overwrite into " + tableName
-        + " select l_returnflag, l_orderkey, l_partkey, 30.0 as l_quantity from lineitem
"
-        + " where l_orderkey = 1 and l_partkey = 1 and  l_linenumber = 1");
+      + " select l_returnflag, l_orderkey, l_partkey, 30.0 as l_quantity from lineitem "
+      + " where l_orderkey = 1 and l_partkey = 1 and  l_linenumber = 1");
     res.close();
 
-    desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
-    assertTrue(fs.isDirectory(path));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=30.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
-
+    verifyDirectoriesForThreeColumns(fs, path, 3);
     if (!testingCluster.isHCatalogStoreRunning()) {
       // TODO: If there is existing another partition directory, we must add its rows number
to result row numbers.
+      // desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
       // assertEquals(6, desc.getStats().getNumRows().intValue());
     }
 
-    res = executeString("select * from " + tableName + " where col2 = 1");
-    resultSetData = resultSetToString(res);
-    res.close();
-    expected = "col4,col1,col2,col3\n" +
-        "-------------------------------\n" +
-        "N,1,1,17.0\n" +
-        "N,1,1,17.0\n" +
-        "N,1,1,30.0\n" +
-        "N,1,1,36.0\n" +
-        "N,1,1,36.0\n";
-
-    assertEquals(expected, resultSetData);
+    verifyKeptExistingData(res, tableName);
 
     // insert overwrite empty result to partitioned table
     res = executeString("insert overwrite into " + tableName
-      + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem where l_orderkey"
+
-      " > 100");
+      + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem where l_orderkey
> 100");
     res.close();
 
-    desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    verifyDirectoriesForThreeColumns(fs, path, 4);
+    verifyKeptExistingData(res, tableName);
 
-    ContentSummary summary = fs.getContentSummary(new Path(desc.getPath()));
+    executeString("DROP TABLE " + tableName + " PURGE").close();
+  }
 
-    assertEquals(summary.getDirectoryCount(), 1L);
-    assertEquals(summary.getFileCount(), 0L);
-    assertEquals(summary.getLength(), 0L);
+  private final void verifyKeptExistingData(ResultSet res, String tableName) throws Exception
{
+    res = executeString("select * from " + tableName + " where col2 = 1");
+    String resultSetData = resultSetToString(res);
+    res.close();
+    String expected = "col4,col1,col2,col3\n" +
+      "-------------------------------\n" +
+      "N,1,1,17.0\n" +
+      "N,1,1,17.0\n" +
+      "N,1,1,30.0\n" +
+      "N,1,1,36.0\n" +
+      "N,1,1,36.0\n";
+
+    assertEquals(expected, resultSetData);
+  }
+
+  private final void verifyDirectoriesForThreeColumns(FileSystem fs, Path path, int step)
throws Exception {
+    assertTrue(fs.isDirectory(path));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+
+    if (step == 1 || step == 2) {
+      assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+    } else {
+      assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+      assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=30.0")));
+    }
+
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/tajo/blob/57faba4f/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index 19b2ee1..0a81e15 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -34,7 +34,8 @@ Available Session Variables:
 \set MAX_OUTPUT_FILE_SIZE [int value] - Maximum per-output file size (mb). 0 means infinite.
 \set NULL_CHAR [text value] - null char of text file output
 \set CODEGEN [true or false] - Runtime code generation enabled (experiment)
+\set PARTITION_NO_RESULT_OVERWRITE_ENABLED [true or false] - If True, a partitioned table
is overwritten even if a sub query leads to no result. Otherwise, the table data will be kept
if there is no result
 \set ARITHABORT [true or false] - If true, a running query will be terminated when an overflow
or divide-by-zero occurs.
 \set FETCH_ROWNUM [int value] - Sets the number of rows at a time from Master
 \set BLOCK_ON_RESULT [true or false] - Whether to block result set on query execution
-\set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled
+\set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/57faba4f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
index ce963c8..231694a 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -642,7 +642,12 @@ public abstract class StorageManager {
           Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
           ContentSummary summary = fs.getContentSummary(stagingResultDir);
 
-          if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty() && summary.getFileCount()
> 0L) {
+          // When inserting empty data into a partitioned table, check if keep existing data
need to be remove or not.
+          boolean overwriteEnabled = queryContext.getBool(SessionVars.PARTITION_NO_RESULT_OVERWRITE_ENABLED);
+
+          // If existing data doesn't need to keep, check if there are some files.
+          if ( (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty())
+            && (!overwriteEnabled || (overwriteEnabled && summary.getFileCount()
> 0L))) {
             // This is a map for existing non-leaf directory to rename. A key is current
directory and a value is
             // renaming directory.
             Map<Path, Path> renameDirs = TUtil.newHashMap();


Mime
View raw message