tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [24/43] git commit: TAJO-1067: INSERT OVERWRITE INTO should not remove all partitions. (jaehwa)
Date Fri, 10 Oct 2014 04:33:30 GMT
TAJO-1067: INSERT OVERWRITE INTO should not remove all partitions. (jaehwa)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ca5fb301
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ca5fb301
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ca5fb301

Branch: refs/heads/index_support
Commit: ca5fb301bff4b38d80a523d5bece9eaf74f64ec3
Parents: ca187bc
Author: Jaehwa Jung <blrunner@apache.org>
Authored: Mon Oct 6 14:12:42 2014 +0900
Committer: Jaehwa Jung <blrunner@apache.org>
Committed: Mon Oct 6 14:12:42 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   2 +-
 .../apache/tajo/master/querymaster/Query.java   | 135 +++++++++++++++++--
 .../tajo/engine/query/TestTablePartitions.java  |  53 ++++++--
 4 files changed, 170 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/ca5fb301/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index bcb4a7d..8f74a7f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -158,6 +158,8 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1067: INSERT OVERWRITE INTO should not remove all partitions. (jaehwa)
+
     TAJO-1065: The \admin -cluster argument doesn't run as expected.
     (Jongyoung Park via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/ca5fb301/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index b5a9b50..f9f5e4a 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -359,7 +359,7 @@ public class TajoConf extends Configuration {
     $BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false),
 
     // ResultSet ---------------------------------------------------------
-    $RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200)
+    $RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200),
     ;
 
     public final String varname;

http://git-wip-us.apache.org/repos/asf/tajo/blob/ca5fb301/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index c2cf54e..7899365 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -418,25 +418,75 @@ public class Query implements EventHandler<QueryEvent> {
 
           if (queryContext.isOutputOverwrite()) { // INSERT OVERWRITE INTO
 
-            // it moves the original table into the temporary location.
+            // It moves the original table into the temporary location.
             // Then it moves the new result table into the original table location.
             // Upon failed, it recovers the original table if possible.
             boolean movedToOldTable = false;
             boolean committed = false;
             Path oldTableDir = new Path(queryContext.getStagingDir(), TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
-            try {
-              if (fs.exists(finalOutputDir)) {
-                fs.rename(finalOutputDir, oldTableDir);
-                movedToOldTable = fs.exists(oldTableDir);
-              } else { // if the parent does not exist, make its parent directory.
-                fs.mkdirs(finalOutputDir.getParent());
+
+            if (queryContext.hasPartition()) {
+              // This is a map for existing non-leaf directory to rename. A key is current
directory and a value is
+              // renaming directory.
+              Map<Path, Path> renameDirs = TUtil.newHashMap();
+              // This is a map for recovering existing partition directory. A key is current
directory and a value is
+              // temporary directory to back up.
+              Map<Path, Path> recoveryDirs = TUtil.newHashMap();
+
+              try {
+                if (!fs.exists(finalOutputDir)) {
+                  fs.mkdirs(finalOutputDir);
+                }
+
+                visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(),
+                    renameDirs, oldTableDir);
+
+                // Rename target partition directories
+                for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+                  // Backup existing data files for recovering
+                  if (fs.exists(entry.getValue())) {
+                    String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(),
+                        oldTableDir.toString());
+                    Path recoveryPath = new Path(recoveryPathString);
+                    fs.rename(entry.getValue(), recoveryPath);
+                    fs.exists(recoveryPath);
+                    recoveryDirs.put(entry.getValue(), recoveryPath);
+                  }
+                  // Delete existing directory
+                  fs.delete(entry.getValue(), true);
+                  // Rename staging directory to final output directory
+                  fs.rename(entry.getKey(), entry.getValue());
+                }
+
+              } catch (IOException ioe) {
+                // Remove created dirs
+                for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+                  fs.delete(entry.getValue(), true);
+                }
+
+                // Recovery renamed dirs
+                for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
+                  fs.delete(entry.getValue(), true);
+                  fs.rename(entry.getValue(), entry.getKey());
+                }
+                throw new IOException(ioe.getMessage());
               }
-              fs.rename(stagingResultDir, finalOutputDir);
-              committed = fs.exists(finalOutputDir);
-            } catch (IOException ioe) {
-              // recover the old table
-              if (movedToOldTable && !committed) {
-                fs.rename(oldTableDir, finalOutputDir);
+            } else {
+              try {
+                if (fs.exists(finalOutputDir)) {
+                  fs.rename(finalOutputDir, oldTableDir);
+                  movedToOldTable = fs.exists(oldTableDir);
+                } else { // if the parent does not exist, make its parent directory.
+                  fs.mkdirs(finalOutputDir.getParent());
+                }
+
+                fs.rename(stagingResultDir, finalOutputDir);
+                committed = fs.exists(finalOutputDir);
+              } catch (IOException ioe) {
+                // recover the old table
+                if (movedToOldTable && !committed) {
+                  fs.rename(oldTableDir, finalOutputDir);
+                }
               }
             }
           } else {
@@ -486,6 +536,65 @@ public class Query implements EventHandler<QueryEvent> {
       return finalOutputDir;
     }
 
+    /**
+     * This method sets a rename map which includes renamed staging directory to final output
directory recursively.
+     * If there exists some data files, this delete it for duplicate data.
+     *
+     *
+     * @param fs
+     * @param stagingPath
+     * @param outputPath
+     * @param stagingParentPathString
+     * @throws IOException
+     */
+    private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath,
+                                        String stagingParentPathString,
+                                        Map<Path, Path> renameDirs, Path oldTableDir)
throws IOException {
+      FileStatus[] files = fs.listStatus(stagingPath);
+
+      for(FileStatus eachFile : files) {
+        if (eachFile.isDirectory()) {
+          Path oldPath = eachFile.getPath();
+
+          // Make recover directory.
+          String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString,
+          oldTableDir.toString());
+          Path recoveryPath = new Path(recoverPathString);
+          if (!fs.exists(recoveryPath)) {
+            fs.mkdirs(recoveryPath);
+          }
+
+          visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString,
+          renameDirs, oldTableDir);
+          // Find last order partition for renaming
+          String newPathString = oldPath.toString().replaceAll(stagingParentPathString,
+          outputPath.toString());
+          Path newPath = new Path(newPathString);
+          if (!isLeafDirectory(fs, eachFile.getPath())) {
+           renameDirs.put(eachFile.getPath(), newPath);
+          } else {
+            if (!fs.exists(newPath)) {
+             fs.mkdirs(newPath);
+            }
+          }
+        }
+      }
+    }
+
+    private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException {
+      boolean retValue = false;
+
+      FileStatus[] files = fs.listStatus(path);
+      for (FileStatus file : files) {
+        if (fs.isDirectory(file.getPath())) {
+          retValue = true;
+          break;
+        }
+      }
+
+      return retValue;
+    }
+
     private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException
{
       FileStatus[] files = fs.listStatus(stagingPath);
       if (files != null && files.length != 0) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/ca5fb301/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index 5bf2944..da09129 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -25,10 +25,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.DeflateCodec;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryTestCaseBase;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.*;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
@@ -47,6 +44,7 @@ import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.master.querymaster.SubQuery;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TajoWorker;
 import org.junit.Test;
 
@@ -485,6 +483,45 @@ public class TestTablePartitions extends QueryTestCaseBase {
         "R,3,3,49.0\n" +
         "R,3,3,49.0\n";
     assertEquals(expected, resultSetData);
+
+    // Check not to remove existing partition directories.
+    res = executeString("insert overwrite into " + tableName
+        + " select l_returnflag, l_orderkey, l_partkey, 30.0 as l_quantity from lineitem
"
+        + " where l_orderkey = 1 and l_partkey = 1 and  l_linenumber = 1");
+    res.close();
+
+    desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
+    assertTrue(fs.isDirectory(path));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=30.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
+
+    if (!testingCluster.isHCatalogStoreRunning()) {
+      // TODO: If there is existing another partition directory, we must add its rows number
to result row numbers.
+      // assertEquals(6, desc.getStats().getNumRows().intValue());
+    }
+
+    res = executeString("select * from " + tableName + " where col2 = 1");
+    resultSetData = resultSetToString(res);
+    res.close();
+    expected = "col4,col1,col2,col3\n" +
+        "-------------------------------\n" +
+        "N,1,1,17.0\n" +
+        "N,1,1,17.0\n" +
+        "N,1,1,30.0\n" +
+        "N,1,1,36.0\n" +
+        "N,1,1,36.0\n";
+
+    assertEquals(expected, resultSetData);
   }
 
   @Test
@@ -888,16 +925,16 @@ public class TestTablePartitions extends QueryTestCaseBase {
 
     executeDDL("lineitemspecial_ddl.sql", "lineitemspecial.tbl");
 
-    executeString("CREATE TABLE IF NOT EXISTS pTable947 (id int, name text) PARTITION BY
COLUMN (type text)")
+    executeString("CREATE TABLE IF NOT EXISTS pTable948 (id int, name text) PARTITION BY
COLUMN (type text)")
         .close();
-    executeString("INSERT OVERWRITE INTO pTable947 SELECT l_orderkey, l_shipinstruct, l_shipmode
FROM lineitemspecial")
+    executeString("INSERT OVERWRITE INTO pTable948 SELECT l_orderkey, l_shipinstruct, l_shipmode
FROM lineitemspecial")
         .close();
 
-    ResultSet res = executeString("select * from pTable947 where type='RA:*?><I/L#%S'");
+    ResultSet res = executeString("select * from pTable948 where type='RA:*?><I/L#%S'");
     assertResultSet(res);
     cleanupQuery(res);
 
-    res = executeString("select * from pTable947 where type='RA:*?><I/L#%S' or type='AIR01'");
+    res = executeString("select * from pTable948 where type='RA:*?><I/L#%S' or type='AIR01'");
     assertResultSet(res);
     cleanupQuery(res);
   }


Mime
View raw message