hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sp...@apache.org
Subject hive git commit: HIVE-15199: INSERT INTO data on S3 is replacing the old rows with the new ones (Sergio Pena, reviewed by Yongzhi Chen, Sahil Takiar, Illya Yalovyy, Steve Loughran)
Date Wed, 23 Nov 2016 18:45:11 GMT
Repository: hive
Updated Branches:
  refs/heads/master e1c1b062a -> 9f72e40d5


HIVE-15199: INSERT INTO data on S3 is replacing the old rows with the new ones (Sergio Pena,
reviewed by Yongzhi Chen, Sahil Takiar, Illya Yalovyy, Steve Loughran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9f72e40d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9f72e40d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9f72e40d

Branch: refs/heads/master
Commit: 9f72e40d58c6bfce9ecfb2d6ac27b524ff1e4daa
Parents: e1c1b06
Author: Sergio Pena <sergio.pena@cloudera.com>
Authored: Wed Nov 23 12:43:55 2016 -0600
Committer: Sergio Pena <sergio.pena@cloudera.com>
Committed: Wed Nov 23 12:43:55 2016 -0600

----------------------------------------------------------------------
 .../test/queries/clientpositive/insert_into.q   |  1 +
 .../results/clientpositive/insert_into.q.out    | 61 ++++++++-----
 .../apache/hadoop/hive/ql/metadata/Hive.java    | 93 ++++++++++----------
 3 files changed, 86 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9f72e40d/itests/hive-blobstore/src/test/queries/clientpositive/insert_into.q
----------------------------------------------------------------------
diff --git a/itests/hive-blobstore/src/test/queries/clientpositive/insert_into.q b/itests/hive-blobstore/src/test/queries/clientpositive/insert_into.q
index 919ff7d..c9ed57d 100644
--- a/itests/hive-blobstore/src/test/queries/clientpositive/insert_into.q
+++ b/itests/hive-blobstore/src/test/queries/clientpositive/insert_into.q
@@ -3,5 +3,6 @@ set hive.blobstore.use.blobstore.as.scratchdir=true;
 DROP TABLE qtest;
 CREATE TABLE qtest (value int) LOCATION '${hiveconf:test.blobstore.path.unique}/qtest/';
 INSERT INTO qtest VALUES (1), (10), (100), (1000);
+INSERT INTO qtest VALUES (2), (20), (200), (2000);
 EXPLAIN EXTENDED INSERT INTO qtest VALUES (1), (10), (100), (1000);
 SELECT * FROM qtest;

http://git-wip-us.apache.org/repos/asf/hive/blob/9f72e40d/itests/hive-blobstore/src/test/results/clientpositive/insert_into.q.out
----------------------------------------------------------------------
diff --git a/itests/hive-blobstore/src/test/results/clientpositive/insert_into.q.out b/itests/hive-blobstore/src/test/results/clientpositive/insert_into.q.out
index c25d0c4..00ad136 100644
--- a/itests/hive-blobstore/src/test/results/clientpositive/insert_into.q.out
+++ b/itests/hive-blobstore/src/test/results/clientpositive/insert_into.q.out
@@ -21,6 +21,15 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@values__tmp__table__1
 POSTHOOK: Output: default@qtest
 POSTHOOK: Lineage: qtest.value EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1,
type:string, comment:), ]
+PREHOOK: query: INSERT INTO qtest VALUES (2), (20), (200), (2000)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__2
+PREHOOK: Output: default@qtest
+POSTHOOK: query: INSERT INTO qtest VALUES (2), (20), (200), (2000)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__2
+POSTHOOK: Output: default@qtest
+POSTHOOK: Lineage: qtest.value EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1,
type:string, comment:), ]
 PREHOOK: query: EXPLAIN EXTENDED INSERT INTO qtest VALUES (1), (10), (100), (1000)
 PREHOOK: type: QUERY
 POSTHOOK: query: EXPLAIN EXTENDED INSERT INTO qtest VALUES (1), (10), (100), (1000)
@@ -40,7 +49,7 @@ STAGE PLANS:
     Map Reduce
       Map Operator Tree:
           TableScan
-            alias: values__tmp__table__2
+            alias: values__tmp__table__3
             Statistics: Num rows: 1 Data size: 14 Basic stats: COMPLETE Column stats: NONE
             GatherStats: false
             Select Operator
@@ -65,11 +74,11 @@ STAGE PLANS:
 #### A masked pattern was here ####
                       location ### test.blobstore.path ###/qtest
                       name default.qtest
-                      numFiles 1
+                      numFiles 2
                       serialization.ddl struct qtest { i32 value}
                       serialization.format 1
                       serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                      totalSize 14
+                      totalSize 28
 #### A masked pattern was here ####
                     serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                     name: default.qtest
@@ -81,7 +90,7 @@ STAGE PLANS:
       Path -> Partition:
 #### A masked pattern was here ####
           Partition
-            base file name: Values__Tmp__Table__2
+            base file name: Values__Tmp__Table__3
             input format: org.apache.hadoop.mapred.TextInputFormat
             output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
             properties:
@@ -90,8 +99,8 @@ STAGE PLANS:
               columns.comments 
               columns.types string
 #### A masked pattern was here ####
-              name default.values__tmp__table__2
-              serialization.ddl struct values__tmp__table__2 { string tmp_values_col1}
+              name default.values__tmp__table__3
+              serialization.ddl struct values__tmp__table__3 { string tmp_values_col1}
               serialization.format 1
               serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
             serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
@@ -104,13 +113,13 @@ STAGE PLANS:
                 columns.comments 
                 columns.types string
 #### A masked pattern was here ####
-                name default.values__tmp__table__2
-                serialization.ddl struct values__tmp__table__2 { string tmp_values_col1}
+                name default.values__tmp__table__3
+                serialization.ddl struct values__tmp__table__3 { string tmp_values_col1}
                 serialization.format 1
                 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-              name: default.values__tmp__table__2
-            name: default.values__tmp__table__2
+              name: default.values__tmp__table__3
+            name: default.values__tmp__table__3
       Truncated Path -> Alias:
 #### A masked pattern was here ####
 
@@ -140,11 +149,11 @@ STAGE PLANS:
 #### A masked pattern was here ####
                 location ### test.blobstore.path ###/qtest
                 name default.qtest
-                numFiles 1
+                numFiles 2
                 serialization.ddl struct qtest { i32 value}
                 serialization.format 1
                 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                totalSize 14
+                totalSize 28
 #### A masked pattern was here ####
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.qtest
@@ -174,11 +183,11 @@ STAGE PLANS:
 #### A masked pattern was here ####
                     location ### test.blobstore.path ###/qtest
                     name default.qtest
-                    numFiles 1
+                    numFiles 2
                     serialization.ddl struct qtest { i32 value}
                     serialization.format 1
                     serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                    totalSize 14
+                    totalSize 28
 #### A masked pattern was here ####
                   serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                   name: default.qtest
@@ -201,11 +210,11 @@ STAGE PLANS:
 #### A masked pattern was here ####
               location ### test.blobstore.path ###/qtest
               name default.qtest
-              numFiles 1
+              numFiles 2
               serialization.ddl struct qtest { i32 value}
               serialization.format 1
               serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-              totalSize 14
+              totalSize 28
 #### A masked pattern was here ####
             serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
           
@@ -219,11 +228,11 @@ STAGE PLANS:
 #### A masked pattern was here ####
                 location ### test.blobstore.path ###/qtest
                 name default.qtest
-                numFiles 1
+                numFiles 2
                 serialization.ddl struct qtest { i32 value}
                 serialization.format 1
                 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                totalSize 14
+                totalSize 28
 #### A masked pattern was here ####
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.qtest
@@ -252,11 +261,11 @@ STAGE PLANS:
 #### A masked pattern was here ####
                     location ### test.blobstore.path ###/qtest
                     name default.qtest
-                    numFiles 1
+                    numFiles 2
                     serialization.ddl struct qtest { i32 value}
                     serialization.format 1
                     serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                    totalSize 14
+                    totalSize 28
 #### A masked pattern was here ####
                   serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                   name: default.qtest
@@ -279,11 +288,11 @@ STAGE PLANS:
 #### A masked pattern was here ####
               location ### test.blobstore.path ###/qtest
               name default.qtest
-              numFiles 1
+              numFiles 2
               serialization.ddl struct qtest { i32 value}
               serialization.format 1
               serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-              totalSize 14
+              totalSize 28
 #### A masked pattern was here ####
             serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
           
@@ -297,11 +306,11 @@ STAGE PLANS:
 #### A masked pattern was here ####
                 location ### test.blobstore.path ###/qtest
                 name default.qtest
-                numFiles 1
+                numFiles 2
                 serialization.ddl struct qtest { i32 value}
                 serialization.format 1
                 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                totalSize 14
+                totalSize 28
 #### A masked pattern was here ####
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.qtest
@@ -328,3 +337,7 @@ POSTHOOK: Input: default@qtest
 10
 100
 1000
+2
+20
+200
+2000

http://git-wip-us.apache.org/repos/asf/hive/blob/9f72e40d/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 61b8bd0..d912f6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -57,11 +57,13 @@ import javax.jdo.JDODataStoreException;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.common.BlobStorageUtils;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.ObjectPair;
@@ -2802,36 +2804,16 @@ private void constructOneLBLocationMap(FileStatus fSta,
       for (final FileStatus srcFile : files) {
         final Path srcP = srcFile.getPath();
         final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs);
-        // Strip off the file type, if any so we don't make:
-        // 000000_0.gz -> 000000_0.gz_copy_1
-        final String name;
-        final String filetype;
-        String itemName = srcP.getName();
-        int index = itemName.lastIndexOf('.');
-        if (index >= 0) {
-          filetype = itemName.substring(index);
-          name = itemName.substring(0, index);
-        } else {
-          name = itemName;
-          filetype = "";
-        }
-        final boolean renameNonLocal = !needToCopy && !isSrcLocal;
+
+        final boolean isRenameAllowed = !needToCopy && !isSrcLocal;
         // If we do a rename for a non-local file, we will be transfering the original
         // file permissions from source to the destination. Else, in case of mvFile() where
we
         // copy from source to destination, we will inherit the destination's parent group
ownership.
-        final String srcGroup = renameNonLocal ? srcFile.getGroup() :
+        final String srcGroup = isRenameAllowed ? srcFile.getGroup() :
           fullDestStatus.getFileStatus().getGroup();
         if (null == pool) {
-          Path destPath = new Path(destf, srcP.getName());
           try {
-
-            if (renameNonLocal) {
-              for (int counter = 1; !destFs.rename(srcP,destPath); counter++) {
-                destPath = new Path(destf, name + ("_copy_" + counter) + filetype);
-              }
-            } else {
-              destPath = mvFile(conf, srcP, destPath, isSrcLocal, srcFs, destFs, name, filetype);
-            }
+            Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isRenameAllowed);
 
             if (null != newFiles) {
               newFiles.add(destPath);
@@ -2845,14 +2827,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
             @Override
             public ObjectPair<Path, Path> call() throws Exception {
               SessionState.setCurrentSessionState(parentSession);
-              Path destPath = new Path(destf, srcP.getName());
-              if (renameNonLocal) {
-                for (int counter = 1; !destFs.rename(srcP,destPath); counter++) {
-                  destPath = new Path(destf, name + ("_copy_" + counter) + filetype);
-                }
-              } else {
-                destPath = mvFile(conf, srcP, destPath, isSrcLocal, srcFs, destFs, name,
filetype);
-              }
+
+              Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, isRenameAllowed);
 
               if (inheritPerms) {
                 HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, destFs, destPath,
false);
@@ -2933,24 +2909,51 @@ private void constructOneLBLocationMap(FileStatus fSta,
     return ShimLoader.getHadoopShims().getPathWithoutSchemeAndAuthority(path);
   }
 
-  private static Path mvFile(HiveConf conf, Path srcf, Path destf, boolean isSrcLocal,
-      FileSystem srcFs, FileSystem destFs, String srcName, String filetype) throws IOException
{
+  private static Path mvFile(HiveConf conf, FileSystem sourceFs, Path sourcePath, FileSystem
destFs, Path destDirPath,
+                             boolean isSrcLocal, boolean isRenameAllowed) throws IOException
{
+
+    boolean isBlobStoragePath = BlobStorageUtils.isBlobStoragePath(conf, destDirPath);
+
+    // Strip off the file type, if any so we don't make:
+    // 000000_0.gz -> 000000_0.gz_copy_1
+    final String fullname = sourcePath.getName();
+    final String name = FilenameUtils.getBaseName(sourcePath.getName());
+    final String type = FilenameUtils.getExtension(sourcePath.getName());
 
-    for (int counter = 1; destFs.exists(destf); counter++) {
-      destf = new Path(destf.getParent(), srcName + ("_copy_" + counter) + filetype);
+    Path destFilePath = new Path(destDirPath, fullname);
+
+    /*
+       * The below loop may perform bad when the destination file already exists and it has
too many _copy_
+       * files as well. A desired approach was to call listFiles() and get a complete list
of files from
+       * the destination, and check whether the file exists or not on that list. However,
millions of files
+       * could live on the destination directory, and on concurrent situations, this can
cause OOM problems.
+       *
+       * I'll leave the below loop for now until a better approach is found.
+       */
+    
+    int counter = 1;
+    if (!isRenameAllowed || isBlobStoragePath) {
+      while (destFs.exists(destFilePath)) {
+        destFilePath =  new Path(destDirPath, name + ("_copy_" + counter) + type);
+        counter++;
+      }
     }
-    if (isSrcLocal) {
-      // For local src file, copy to hdfs
-      destFs.copyFromLocalFile(srcf, destf);
+
+    if (isRenameAllowed) {
+      while (!destFs.rename(sourcePath, destFilePath)) {
+        destFilePath =  new Path(destDirPath, name + ("_copy_" + counter) + type);
+        counter++;
+      }
+    } else if (isSrcLocal) {
+      destFs.copyFromLocalFile(sourcePath, destFilePath);
     } else {
-      //copy if across file system or encryption zones.
-      LOG.info("Copying source " + srcf + " to " + destf + " because HDFS encryption zones
are different.");
-      FileUtils.copy(srcFs, srcf, destFs, destf,
-          true,    // delete source
-          false, // overwrite destination
+      FileUtils.copy(sourceFs, sourcePath, destFs, destFilePath,
+          true,   // delete source
+          false,  // overwrite destination
           conf);
     }
-    return destf;
+
+    return destFilePath;
   }
 
   // Clears the dest dir when src is sub-dir of dest.


Mime
View raw message