hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject hive git commit: HIVE-17856 : MM tables - IOW is not ACID compliant (Steve Yeom, reviewed by Sergey Shelukhin and Eugene Koifman)
Date Sat, 02 Dec 2017 02:41:13 GMT
Repository: hive
Updated Branches:
  refs/heads/master 3df202f29 -> 5f12cb844


HIVE-17856 : MM tables - IOW is not ACID compliant (Steve Yeom, reviewed by Sergey Shelukhin and Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5f12cb84
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5f12cb84
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5f12cb84

Branch: refs/heads/master
Commit: 5f12cb84406e50a4f939bf394d8514cd2257d52e
Parents: 3df202f
Author: sergey <sershe@apache.org>
Authored: Fri Dec 1 18:40:54 2017 -0800
Committer: sergey <sershe@apache.org>
Committed: Fri Dec 1 18:40:54 2017 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/JavaUtils.java    |  32 +-
 .../hive/ql/exec/AbstractFileMergeOperator.java |   4 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |   6 +-
 .../hadoop/hive/ql/exec/JoinOperator.java       |   2 +-
 .../apache/hadoop/hive/ql/exec/Utilities.java   |  67 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |  26 +
 .../hadoop/hive/ql/io/HiveInputFormat.java      |   9 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  50 +-
 .../hadoop/hive/ql/parse/QBParseInfo.java       |  16 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |  26 +-
 .../hive/ql/TestTxnCommandsForMmTable.java      | 614 +++++++++++++++++++
 .../hive/ql/TestTxnCommandsForOrcMmTable.java   |  42 ++
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java |  13 +-
 .../hadoop/hive/ql/exec/TestUtilities.java      |   2 +-
 .../results/clientpositive/llap/mm_all.q.out    |  30 +-
 ql/src/test/results/clientpositive/mm_all.q.out |  30 +-
 16 files changed, 830 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
index f4ebd3b..8b1bbaa 100644
--- a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
  */
 public final class JavaUtils {
 
+  public static final String BASE_PREFIX =  "base";
   public static final String DELTA_PREFIX = "delta";
   public static final String DELTA_DIGITS = "%07d";
   public static final int DELTA_DIGITS_LEN = 7;
@@ -167,8 +168,8 @@ public final class JavaUtils {
 
   public static Long extractTxnId(Path file) {
     String fileName = file.getName();
-    String[] parts = fileName.split("_", 4);  // e.g. delta_0000001_0000001_0000
-    if (parts.length < 4 || !DELTA_PREFIX.equals(parts[0])) {
+    String[] parts = fileName.split("_", 4);  // e.g. delta_0000001_0000001_0000 or base_0000022
+    if (parts.length < 2 || !(DELTA_PREFIX.equals(parts[0]) || BASE_PREFIX.equals(parts[0]))) {
       LOG.debug("Cannot extract transaction ID for a MM table: " + file
           + " (" + Arrays.toString(parts) + ")");
       return null;
@@ -185,20 +186,31 @@ public final class JavaUtils {
   }
 
   public static class IdPathFilter implements PathFilter {
-    private final String mmDirName;
+    private String mmDirName;
     private final boolean isMatch, isIgnoreTemp, isPrefix;
+
     public IdPathFilter(long writeId, int stmtId, boolean isMatch) {
-      this(writeId, stmtId, isMatch, false);
+      this(writeId, stmtId, isMatch, false, false);
     }
     public IdPathFilter(long writeId, int stmtId, boolean isMatch, boolean isIgnoreTemp) {
-      String mmDirName = DELTA_PREFIX + "_" + String.format(DELTA_DIGITS, writeId) + "_" +
-          String.format(DELTA_DIGITS, writeId) + "_";
-      if (stmtId >= 0) {
-        mmDirName += String.format(STATEMENT_DIGITS, stmtId);
-        isPrefix = false;
+      this(writeId, stmtId, isMatch, isIgnoreTemp, false);
+    }
+    public IdPathFilter(long writeId, int stmtId, boolean isMatch, boolean isIgnoreTemp, boolean isBaseDir) {
+      String mmDirName = null;
+      if (!isBaseDir) {
+        mmDirName = DELTA_PREFIX + "_" + String.format(DELTA_DIGITS, writeId) + "_" +
+                String.format(DELTA_DIGITS, writeId) + "_";
+        if (stmtId >= 0) {
+          mmDirName += String.format(STATEMENT_DIGITS, stmtId);
+          isPrefix = false;
+        } else {
+          isPrefix = true;
+        }
       } else {
-        isPrefix = true;
+        mmDirName = BASE_PREFIX + "_" + String.format(DELTA_DIGITS, writeId);
+        isPrefix = false;
       }
+
       this.mmDirName = mmDirName;
       this.isMatch = isMatch;
       this.isIgnoreTemp = isIgnoreTemp;

http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
index b163a1e..3a0c289 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
@@ -260,7 +260,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
           // There's always just one file that we have merged.
           // The union/DP/etc. should already be account for in the path.
           Utilities.writeMmCommitManifest(Lists.newArrayList(outPath),
-              tmpPath.getParent(), fs, taskId, conf.getTxnId(), conf.getStmtId(), null);
+              tmpPath.getParent(), fs, taskId, conf.getTxnId(), conf.getStmtId(), null, false);
           LOG.info("Merged into " + finalPath + "(" + fss.getLen() + " bytes).");
         }
       }
@@ -340,7 +340,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
         // We don't expect missing buckets from mere (actually there should be no buckets),
         // so just pass null as bucketing context. Union suffix should also be accounted for.
         Utilities.handleMmTableFinalPath(outputDir.getParent(), null, hconf, success,
-            dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false);
+            dpLevels, lbLevels, null, mmWriteId, stmtId, reporter, isMmTable, false, false);
       }
 
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index b4989f1..219d1ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -337,7 +337,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           }
           outPaths[filesIdx] = getTaskOutPath(taskId);
         } else {
-          String subdirPath = AcidUtils.deltaSubdir(txnId, txnId, stmtId);
+          String subdirPath = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), txnId, txnId, stmtId);
           if (unionPath != null) {
             // Create the union directory inside the MM directory.
             subdirPath += Path.SEPARATOR + unionPath;
@@ -1324,7 +1324,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       }
       if (conf.isMmTable()) {
         Utilities.writeMmCommitManifest(
-            commitPaths, specPath, fs, taskId, conf.getTransactionId(), conf.getStatementId(), unionPath);
+            commitPaths, specPath, fs, taskId, conf.getTransactionId(), conf.getStatementId(), unionPath, conf.getInsertOverwrite());
       }
       // Only publish stats if this operator's flag was set to gather stats
       if (conf.isGatherStats()) {
@@ -1383,7 +1383,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
               conf.getTableInfo(), numBuckets, conf.getCompressed());
           Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, success,
               dpLevels, lbLevels, mbc, conf.getTransactionId(), conf.getStatementId(), reporter,
-              conf.isMmTable(), conf.isMmCtas());
+              conf.isMmTable(), conf.isMmCtas(), conf.getInsertOverwrite());
         }
       }
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
index 65b2f87..8732348 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
@@ -234,7 +234,7 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial
         Utilities.FILE_OP_LOGGER.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath + "(spec " + specPath + ")");
         Utilities.rename(fs, tmpPath, intermediatePath);
         // Step2: remove any tmp file or double-committed output files
-        Utilities.removeTempOrDuplicateFiles(fs, intermediatePath);
+        Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, false);
         // Step3: move to the file destination
         Utilities.FILE_OP_LOGGER.info("Moving tmp dir: " + intermediatePath + " to: " + specPath);
         Utilities.renameOrMoveFiles(fs, intermediatePath, specPath);

http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index d7397e0..f7850fd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1501,7 +1501,7 @@ public final class Utilities {
         perfLogger.PerfLogBegin("FileSinkOperator", "RemoveTempOrDuplicateFiles");
         // remove any tmp file or double-committed output files
         List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
-            fs, statuses, dpCtx, conf, hconf, filesKept);
+            fs, statuses, dpCtx, conf, hconf, filesKept, false);
         perfLogger.PerfLogEnd("FileSinkOperator", "RemoveTempOrDuplicateFiles");
         // create empty buckets if necessary
         if (!emptyBuckets.isEmpty()) {
@@ -1592,23 +1592,23 @@ public final class Utilities {
   /**
    * Remove all temporary files and duplicate (double-committed) files from a given directory.
    */
-  public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws IOException {
-    removeTempOrDuplicateFiles(fs, path, null,null,null);
+  public static void removeTempOrDuplicateFiles(FileSystem fs, Path path, boolean isBaseDir) throws IOException {
+    removeTempOrDuplicateFiles(fs, path, null,null,null, isBaseDir);
   }
 
   public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, Path path,
-      DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException {
+      DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, boolean isBaseDir) throws IOException {
     if (path  == null) {
       return null;
     }
     FileStatus[] stats = HiveStatsUtils.getFileStatusRecurse(path,
         ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs);
-    return removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf);
+    return removeTempOrDuplicateFiles(fs, stats, dpCtx, conf, hconf, isBaseDir);
   }
 
   public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats,
-      DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf) throws IOException {
-    return removeTempOrDuplicateFiles(fs, fileStats, dpCtx, conf, hconf, null);
+      DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, boolean isBaseDir) throws IOException {
+    return removeTempOrDuplicateFiles(fs, fileStats, dpCtx, conf, hconf, null, isBaseDir);
   }
 
   /**
@@ -1617,12 +1617,12 @@ public final class Utilities {
    * @return a list of path names corresponding to should-be-created empty buckets.
    */
   public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats,
-      DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, Set<Path> filesKept)
+      DynamicPartitionCtx dpCtx, FileSinkDesc conf, Configuration hconf, Set<Path> filesKept, boolean isBaseDir)
           throws IOException {
     int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
         numBuckets = (conf != null && conf.getTable() != null) ? conf.getTable().getNumBuckets() : 0;
     return removeTempOrDuplicateFiles(
-        fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept);
+        fs, fileStats, null, dpLevels, numBuckets, hconf, null, 0, false, filesKept, isBaseDir);
   }
 
   private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException {
@@ -1641,7 +1641,7 @@ public final class Utilities {
 
   public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats,
       String unionSuffix, int dpLevels, int numBuckets, Configuration hconf, Long txnId,
-      int stmtId, boolean isMmTable, Set<Path> filesKept) throws IOException {
+      int stmtId, boolean isMmTable, Set<Path> filesKept, boolean isBaseDir) throws IOException {
     if (fileStats == null) {
       return null;
     }
@@ -1660,7 +1660,7 @@ public final class Utilities {
 
         if (isMmTable) {
           Path mmDir = parts[i].getPath();
-          if (!mmDir.getName().equals(AcidUtils.deltaSubdir(txnId, txnId, stmtId))) {
+          if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, txnId, txnId, stmtId))) {
             throw new IOException("Unexpected non-MM directory name " + mmDir);
           }
 
@@ -1668,6 +1668,9 @@ public final class Utilities {
 
           if (!StringUtils.isEmpty(unionSuffix)) {
             path = new Path(path, unionSuffix);
+            if (!fs.exists(path)) {
+              continue;
+            }
           }
         }
 
@@ -1684,7 +1687,7 @@ public final class Utilities {
       if (fileStats.length == 0) {
         return result;
       }
-      Path mmDir = extractNonDpMmDir(txnId, stmtId, items);
+      Path mmDir = extractNonDpMmDir(txnId, stmtId, items, isBaseDir);
       taskIDToFile = removeTempOrDuplicateFilesNonMm(
           fs.listStatus(new Path(mmDir, unionSuffix)), fs);
       if (filesKept != null && taskIDToFile != null) {
@@ -1702,7 +1705,7 @@ public final class Utilities {
           addFilesToPathSet(taskIDToFile.values(), filesKept);
         }
       } else {
-        Path mmDir = extractNonDpMmDir(txnId, stmtId, items);
+        Path mmDir = extractNonDpMmDir(txnId, stmtId, items, isBaseDir);
         taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs);
         if (filesKept != null && taskIDToFile != null) {
           addFilesToPathSet(taskIDToFile.values(), filesKept);
@@ -1714,12 +1717,12 @@ public final class Utilities {
     return result;
   }
 
-  private static Path extractNonDpMmDir(Long txnId, int stmtId, FileStatus[] items) throws IOException {
+  private static Path extractNonDpMmDir(Long txnId, int stmtId, FileStatus[] items, boolean isBaseDir) throws IOException {
     if (items.length > 1) {
       throw new IOException("Unexpected directories for non-DP MM: " + Arrays.toString(items));
     }
     Path mmDir = items[0].getPath();
-    if (!mmDir.getName().equals(AcidUtils.deltaSubdir(txnId, txnId, stmtId))) {
+    if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, txnId, txnId, stmtId))) {
       throw new IOException("Unexpected non-MM directory " + mmDir);
     }
       Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in MM directory {}", mmDir);
@@ -4056,7 +4059,7 @@ public final class Utilities {
   }
 
   public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels,
-      int lbLevels, PathFilter filter, long txnId, int stmtId, Configuration conf)
+      int lbLevels, PathFilter filter, long txnId, int stmtId, Configuration conf, boolean isBaseDir)
           throws IOException {
     int skipLevels = dpLevels + lbLevels;
     if (filter == null) {
@@ -4071,7 +4074,7 @@ public final class Utilities {
         || (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs))) {
       return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter);
     }
-    return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, txnId, stmtId);
+    return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, txnId, stmtId, isBaseDir);
   }
 
   private static boolean isS3(FileSystem fs) {
@@ -4148,7 +4151,7 @@ public final class Utilities {
   }
 
   private static Path[] getMmDirectoryCandidatesGlobStatus(FileSystem fs,
-      Path path, int skipLevels, PathFilter filter, long txnId, int stmtId) throws IOException {
+      Path path, int skipLevels, PathFilter filter, long txnId, int stmtId, boolean isBaseDir) throws IOException {
     StringBuilder sb = new StringBuilder(path.toUri().getPath());
     for (int i = 0; i < skipLevels; i++) {
       sb.append(Path.SEPARATOR).append('*');
@@ -4158,7 +4161,7 @@ public final class Utilities {
       // sb.append(Path.SEPARATOR).append(AcidUtils.deltaSubdir(txnId, txnId)).append("_*");
       throw new AssertionError("GlobStatus should not be called without a statement ID");
     } else {
-      sb.append(Path.SEPARATOR).append(AcidUtils.deltaSubdir(txnId, txnId, stmtId));
+      sb.append(Path.SEPARATOR).append(AcidUtils.baseOrDeltaSubdir(isBaseDir, txnId, txnId, stmtId));
     }
     Path pathPattern = new Path(path, sb.toString());
     return statusToPath(fs.globStatus(pathPattern, filter));
@@ -4166,9 +4169,9 @@ public final class Utilities {
 
   private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir,
                                           int dpLevels, int lbLevels, JavaUtils.IdPathFilter filter,
-                                          long txnId, int stmtId, Configuration conf) throws IOException {
+                                          long txnId, int stmtId, Configuration conf, boolean isBaseDir) throws IOException {
     Path[] files = getMmDirectoryCandidates(
-        fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, conf);
+        fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, conf, isBaseDir);
     if (files != null) {
       for (Path path : files) {
         Utilities.FILE_OP_LOGGER.info("Deleting {} on failure", path);
@@ -4181,12 +4184,12 @@ public final class Utilities {
 
 
   public static void writeMmCommitManifest(List<Path> commitPaths, Path specPath, FileSystem fs,
-      String taskId, Long txnId, int stmtId, String unionSuffix) throws HiveException {
+      String taskId, Long txnId, int stmtId, String unionSuffix, boolean isInsertOverwrite) throws HiveException {
     if (commitPaths.isEmpty()) {
       return;
     }
     // We assume one FSOP per task (per specPath), so we create it in specPath.
-    Path manifestPath = getManifestDir(specPath, txnId, stmtId, unionSuffix);
+    Path manifestPath = getManifestDir(specPath, txnId, stmtId, unionSuffix, isInsertOverwrite);
     manifestPath = new Path(manifestPath, taskId + MANIFEST_EXTENSION);
     Utilities.FILE_OP_LOGGER.info("Writing manifest to {} with {}", manifestPath, commitPaths);
     try {
@@ -4205,8 +4208,10 @@ public final class Utilities {
     }
   }
 
-  private static Path getManifestDir(Path specPath, long txnId, int stmtId, String unionSuffix) {
-    Path manifestPath = new Path(specPath, "_tmp." + AcidUtils.deltaSubdir(txnId, txnId, stmtId));
+  private static Path getManifestDir(Path specPath, long txnId, int stmtId, String unionSuffix, boolean isInsertOverwrite) {
+    Path manifestPath = new Path(specPath, "_tmp." + 
+      AcidUtils.baseOrDeltaSubdir(isInsertOverwrite, txnId, txnId, stmtId));
+
     return (unionSuffix == null) ? manifestPath : new Path(manifestPath, unionSuffix);
   }
 
@@ -4223,13 +4228,13 @@ public final class Utilities {
 
   public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Configuration hconf,
       boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long txnId, int stmtId,
-      Reporter reporter, boolean isMmTable, boolean isMmCtas) throws IOException, HiveException {
+      Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite) throws IOException, HiveException {
     FileSystem fs = specPath.getFileSystem(hconf);
-    Path manifestDir = getManifestDir(specPath, txnId, stmtId, unionSuffix);
+    Path manifestDir = getManifestDir(specPath, txnId, stmtId, unionSuffix, isInsertOverwrite);
     if (!success) {
       JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true);
       tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels,
-          filter, txnId, stmtId, hconf);
+          filter, txnId, stmtId, hconf, isInsertOverwrite);
       return;
     }
 
@@ -4252,13 +4257,13 @@ public final class Utilities {
     }
 
     Utilities.FILE_OP_LOGGER.debug("Looking for files in: {}", specPath);
-    JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true);
+    JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true, false, isInsertOverwrite);
     if (isMmCtas && !fs.exists(specPath)) {
       Utilities.FILE_OP_LOGGER.info("Creating table directory for CTAS with no output at {}", specPath);
       FileUtils.mkdir(fs, specPath, hconf);
     }
     Path[] files = getMmDirectoryCandidates(
-        fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, hconf);
+        fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, hconf, isInsertOverwrite);
     ArrayList<Path> mmDirectories = new ArrayList<>();
     if (files != null) {
       for (Path path : files) {
@@ -4319,7 +4324,7 @@ public final class Utilities {
     }
     List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, finalResults,
         unionSuffix, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, txnId, stmtId,
-            isMmTable, null);
+            isMmTable, null, isInsertOverwrite);
     // create empty buckets if necessary
     if (!emptyBuckets.isEmpty()) {
       assert mbc != null;

http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 9ab028d..a85713b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -223,6 +223,19 @@ public class AcidUtils {
   public static String baseDir(long txnId) {
     return BASE_PREFIX + String.format(DELTA_DIGITS, txnId);
   }
+
+  /**
+   * Return a base or delta directory string
+   * according to the given "baseDirRequired".
+   */
+  public static String baseOrDeltaSubdir(boolean baseDirRequired, long min, long max, int statementId) {
+    if (!baseDirRequired) {
+       return deltaSubdir(min, max, statementId);
+    } else {
+       return baseDir(min);
+    }
+  }
+
   /**
    * Create a filename for a bucket file.
    * @param directory the partition directory
@@ -1248,6 +1261,19 @@ public class AcidUtils {
   public static boolean isFullAcidTable(Table table) {
     return isAcidTable(table) && !AcidUtils.isInsertOnlyTable(table);
   }
+  
+  public static boolean isFullAcidTable(CreateTableDesc td) {
+    if (td == null || td.getTblProps() == null) {
+      return false;
+    }
+    String tableIsTransactional = td.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+    if (tableIsTransactional == null) {
+      tableIsTransactional = td.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
+    }
+    return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true") &&
+      !AcidUtils.isInsertOnlyTable(td.getTblProps());
+  }
+  
 
   /**
    * Sets the acidOperationalProperties in the configuration object argument.

http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 819c2a2..1beb839 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -552,7 +552,14 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
         } else if (!hadAcidState) {
           AcidUtils.Directory dirInfo = AcidUtils.getAcidState(currDir, conf, validTxnList, Ref.from(false), true, null);
           hadAcidState = true;
-          // TODO [MM gap]: for IOW, we also need to count in base dir, if any
+
+          // Find the base, created for IOW.
+          Path base = dirInfo.getBaseDirectory();
+          if (base != null) {
+            finalPaths.add(base);
+          }
+
+          // Find the parsed delta files.
           for (AcidUtils.ParsedDelta delta : dirInfo.getCurrentDirectories()) {
             Utilities.FILE_OP_LOGGER.debug("Adding input " + delta.getPath());
             finalPaths.add(delta.getPath());

http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 9f2c6d8..f33855d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1779,12 +1779,6 @@ public class Hive {
           Utilities.FILE_OP_LOGGER.trace("maybe deleting stuff from " + oldPartPath
               + " (new " + newPartPath + ") for replace");
         }
-        if ((loadFileType == LoadFileType.REPLACE_ALL) && oldPartPath != null) {
-          boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
-          deleteOldPathForReplace(newPartPath, oldPartPath, getConf(), isAutoPurge,
-              new JavaUtils.IdPathFilter(txnId, stmtId, false, true), true,
-              tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0);
-        }
       } else {
         // Either a non-MM query, or a load into MM table from an external source.
         PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER;
@@ -1807,7 +1801,7 @@ public class Hive {
           // base_x.  (there is Insert Overwrite and Load Data Overwrite)
           boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
           replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(),
-              isSrcLocal, isAutoPurge, newFiles, filter, isMmTableWrite);
+              isSrcLocal, isAutoPurge, newFiles, filter, isMmTableWrite?true:false);
         } else {
           FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
           copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcidIUDoperation,
@@ -2126,7 +2120,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
         //       where this is used; we always want to load everything; also the only case where
         //       we have multiple statements anyway is union.
         Path[] leafStatus = Utilities.getMmDirectoryCandidates(
-            fs, loadPath, numDP, numLB, null, txnId, -1, conf);
+            fs, loadPath, numDP, numLB, null, txnId, -1, conf, false);
         for (Path p : leafStatus) {
           Path dpPath = p.getParent(); // Skip the MM directory that we have found.
           for (int i = 0; i < numLB; ++i) {
@@ -2330,13 +2324,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
     // Note: this assumes both paths are qualified; which they are, currently.
     if (isMmTable && loadPath.equals(tbl.getPath())) {
       Utilities.FILE_OP_LOGGER.debug("not moving " + loadPath + " to " + tbl.getPath());
-      if (loadFileType == LoadFileType.REPLACE_ALL) {
-        Path tableDest = tbl.getPath();
-        boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
-        deleteOldPathForReplace(tableDest, tableDest, sessionConf, isAutopurge,
-            new JavaUtils.IdPathFilter(txnId, stmtId, false, true), true,
-            tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0);
-      }
       newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId);
     } else {
       // Either a non-MM query, or a load into MM table from an external source.
@@ -2360,7 +2347,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
         //todo:  should probably do the same for MM IOW
         boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
         replaceFiles(tblPath, loadPath, destPath, tblPath,
-            sessionConf, isSrcLocal, isAutopurge, newFiles, filter, isMmTable);
+            sessionConf, isSrcLocal, isAutopurge, newFiles, filter, isMmTable?true:false);
       } else {
         try {
           FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf);
@@ -3885,7 +3872,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
    */
   protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
           boolean isSrcLocal, boolean purge, List<Path> newFiles, PathFilter deletePathFilter,
-          boolean isMmTable) throws HiveException {
+          boolean isMmTableOverwrite) throws HiveException {
     try {
 
       FileSystem destFs = destf.getFileSystem(conf);
@@ -3906,7 +3893,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
       if (oldPath != null) {
         // Note: we assume lbLevels is 0 here. Same as old code for non-MM.
         //       For MM tables, this can only be a LOAD command. Does LOAD even support LB?
-        deleteOldPathForReplace(destf, oldPath, conf, purge, deletePathFilter, isMmTable, 0);
+        deleteOldPathForReplace(destf, oldPath, conf, purge, deletePathFilter, isMmTableOverwrite, 0);
       }
 
       // first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates
@@ -3952,7 +3939,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
   }
 
   private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, boolean purge,
-      PathFilter pathFilter, boolean isMmTable, int lbLevels) throws HiveException {
+      PathFilter pathFilter, boolean isMmTableOverwrite, int lbLevels) throws HiveException {
     Utilities.FILE_OP_LOGGER.debug("Deleting old paths for replace in " + destPath
         + " and old path " + oldPath);
     boolean isOldPathUnderDestf = false;
@@ -3964,32 +3951,13 @@ private void constructOneLBLocationMap(FileStatus fSta,
       // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is
       // not the destf or its subdir?
       isOldPathUnderDestf = isSubDir(oldPath, destPath, oldFs, destFs, false);
-      if (isOldPathUnderDestf || isMmTable) {
-        if (lbLevels == 0 || !isMmTable) {
+      if (isOldPathUnderDestf || isMmTableOverwrite) {
+        if (lbLevels == 0 || !isMmTableOverwrite) {
           cleanUpOneDirectoryForReplace(oldPath, oldFs, pathFilter, conf, purge);
-        } else {
-          // We need to clean up different MM IDs from each LB directory separately.
-          // Avoid temporary directories in the immediate table/part dir.
-          // Note: we could just find directories with any MM directories inside?
-          //       the rest doesn't have to be cleaned up. Play it safe.
-          String mask = "[^._]*";
-          for (int i = 0; i < lbLevels - 1; ++i) {
-            mask += Path.SEPARATOR + "*";
-          }
-          Path glob = new Path(oldPath, mask);
-          FileStatus[] lbDirs = oldFs.globStatus(glob);
-          for (FileStatus lbDir : lbDirs) {
-            Path lbPath = lbDir.getPath();
-            if (!lbDir.isDirectory()) {
-              throw new HiveException("Unexpected path during overwrite: " + lbPath);
-            }
-            Utilities.FILE_OP_LOGGER.info("Cleaning up LB directory " + lbPath);
-            cleanUpOneDirectoryForReplace(lbPath, oldFs, pathFilter, conf, purge);
-          }
         }
       }
     } catch (IOException e) {
-      if (isOldPathUnderDestf || isMmTable) {
+      if (isOldPathUnderDestf || isMmTableOverwrite) {
         // if oldPath is a subdir of destf but it could not be cleaned
         throw new HiveException("Directory " + oldPath.toString()
             + " could not be cleaned up.", e);

http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
index ab71073..f0083f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
@@ -64,6 +64,7 @@ public class QBParseInfo {
   private final Set<String> destCubes;
   private final Set<String> destGroupingSets;
   private final Map<String, ASTNode> destToHaving;
+  private final Map<String, Boolean> destToOpType;
   // insertIntoTables/insertOverwriteTables map a table's fullName to its ast;
   private final Map<String, ASTNode> insertIntoTables;
   private final Map<String, ASTNode> insertOverwriteTables;
@@ -135,6 +136,7 @@ public class QBParseInfo {
     destToSortby = new HashMap<String, ASTNode>();
     destToOrderby = new HashMap<String, ASTNode>();
     destToLimit = new HashMap<String, SimpleEntry<Integer, Integer>>();
+    destToOpType = new HashMap<>();
     insertIntoTables = new HashMap<String, ASTNode>();
     insertOverwriteTables = new HashMap<String, ASTNode>();
     destRollups = new HashSet<String>();
@@ -155,7 +157,7 @@ public class QBParseInfo {
 
   }
 
-  /*
+/*
    * If a QB is such that the aggregation expressions need to be handled by
    * the Windowing PTF; we invoke this function to clear the AggExprs on the dest.
    */
@@ -180,6 +182,18 @@ public class QBParseInfo {
   public void addInsertIntoTable(String fullName, ASTNode ast) {
     insertIntoTables.put(fullName.toLowerCase(), ast);
   }
+  
+  public void setDestToOpType(String clause, boolean value) {
+	destToOpType.put(clause, value);
+  }
+  
+  public boolean isDestToOpTypeInsertOverwrite(String clause) {
+	if (destToOpType.containsKey(clause)) {
+		return destToOpType.get(clause);
+	} else {
+	  return false;
+	}
+  }
 
   /**
    * See also {@link #getInsertOverwriteTables()}

http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index b323ede..5dd3583 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -1506,6 +1506,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
               String fullTableName = getUnescapedName((ASTNode) ast.getChild(0).getChild(0),
                   SessionState.get().getCurrentDatabase());
               qbp.getInsertOverwriteTables().put(fullTableName.toLowerCase(), ast);
+              qbp.setDestToOpType(ctx_1.dest, true);
             }
           }
         }
@@ -6781,7 +6782,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     Integer dest_type = qbm.getDestTypeForAlias(dest);
 
     Table dest_tab = null; // destination table if any
-    boolean destTableIsAcid = false; // should the destination table be written to using ACID
+    boolean destTableIsAcid = false;     // true for full ACID table and MM table
+    boolean destTableIsFullAcid = false; // should the destination table be written to using ACID
     boolean destTableIsTemporary = false;
     boolean destTableIsMaterialization = false;
     Partition dest_part = null;// destination partition if any
@@ -6802,7 +6804,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     case QBMetaData.DEST_TABLE: {
 
       dest_tab = qbm.getDestTableForAlias(dest);
-      destTableIsAcid = AcidUtils.isFullAcidTable(dest_tab);
+      destTableIsAcid = AcidUtils.isAcidTable(dest_tab);
+      destTableIsFullAcid = AcidUtils.isFullAcidTable(dest_tab);
       destTableIsTemporary = dest_tab.isTemporary();
 
       // Is the user trying to insert into a external tables
@@ -6852,7 +6855,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       // NOTE: specify Dynamic partitions in dest_tab for WriteEntity
       if (!isNonNativeTable) {
         AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
-        if (destTableIsAcid) {
+        if (destTableIsFullAcid) {
           acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
           checkAcidConstraints(qb, table_desc, dest_tab);
         }
@@ -6895,7 +6898,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
       dest_part = qbm.getDestPartitionForAlias(dest);
       dest_tab = dest_part.getTable();
-      destTableIsAcid = AcidUtils.isFullAcidTable(dest_tab);
+      destTableIsAcid = AcidUtils.isAcidTable(dest_tab);
+      destTableIsFullAcid = AcidUtils.isFullAcidTable(dest_tab);
 
       checkExternalTable(dest_tab);
 
@@ -6928,7 +6932,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
           dest_part.getSkewedColValues(), dest_part.getSkewedColValueLocationMaps(),
           dest_part.isStoredAsSubDirectories(), conf);
       AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
-      if (destTableIsAcid) {
+      if (destTableIsFullAcid) {
         acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
         checkAcidConstraints(qb, table_desc, dest_tab);
       }
@@ -6945,7 +6949,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old
       // deltas and base and leave them up to the cleaner to clean up
       LoadFileType loadType = (!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
-              dest_tab.getTableName()) && !destTableIsAcid)
+              dest_tab.getTableName()) && !destTableIsAcid) // // Both Full-acid and MM tables are excluded.
               ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING;
       ltd.setLoadFileType(loadType);
       ltd.setLbCtx(lbCtx);
@@ -7019,6 +7023,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       }
 
       destTableIsAcid = tblDesc != null && AcidUtils.isAcidTable(tblDesc);
+      destTableIsFullAcid = tblDesc != null && AcidUtils.isFullAcidTable(tblDesc);
 
       boolean isDestTempFile = true;
       if (!ctx.isMRTmpFileURI(dest_path.toUri().toString())) {
@@ -7109,7 +7114,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         (dest_tab.getSortCols() != null && dest_tab.getSortCols().size() > 0)));
 
     // If this table is working with ACID semantics, turn off merging
-    canBeMerged &= !destTableIsAcid;
+    canBeMerged &= !destTableIsFullAcid;
 
     // Generate the partition columns from the parent input
     if (dest_type.intValue() == QBMetaData.DEST_TABLE
@@ -7120,7 +7125,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, table_desc, dest_part,
         dest_path, currentTableId, destTableIsAcid, destTableIsTemporary,
         destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
-        canBeMerged, dest_tab, txnId, isMmCtas, dest_type);
+        canBeMerged, dest_tab, txnId, isMmCtas, dest_type, qb);
     if (isMmCtas) {
       // Add FSD so that the LoadTask compilation could fix up its path to avoid the move.
       tableDesc.setWriter(fileSinkDesc);
@@ -7231,7 +7236,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       boolean destTableIsMaterialization, Path queryTmpdir,
       SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx,
       RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, boolean isMmCtas,
-      Integer dest_type) throws SemanticException {
+      Integer dest_type, QB qb) throws SemanticException {
     boolean isInsertOverwrite = false;
     switch (dest_type) {
       case QBMetaData.DEST_PARTITION:
@@ -7240,7 +7245,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         //INSERT [OVERWRITE] path
         String destTableFullName = dest_tab.getCompleteName().replace('@', '.');
         Map<String, ASTNode> iowMap = qb.getParseInfo().getInsertOverwriteTables();
-        if (iowMap.containsKey(destTableFullName)) {
+        if (iowMap.containsKey(destTableFullName) && 
+          qb.getParseInfo().isDestToOpTypeInsertOverwrite(dest)) {
           isInsertOverwrite = true;
         }
         break;

http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
new file mode 100644
index 0000000..8e84202
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
@@ -0,0 +1,614 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.txn.AcidCompactionHistoryService;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService;
+import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
+import org.apache.hadoop.hive.ql.txn.compactor.Initiator;
+import org.apache.hadoop.hive.ql.txn.compactor.Worker;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests here are for micro-managed tables:
+ * specifically INSERT OVERWRITE statements and Major/Minor Compactions.
+ */
+public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
+  static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommandsForMmTable.class);
+  protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+    File.separator + TestTxnCommands.class.getCanonicalName()
+    + "-" + System.currentTimeMillis()
+  ).getPath().replaceAll("\\\\", "/");
+  protected static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+  @Override
+  String getTestDataDir() {
+    return TEST_DATA_DIR;
+  }
+
+  enum TableExtended {
+    NONACIDPART("nonAcidPart", "p"),
+    MMTBL("mmTbl"),
+    MMTBL2("mmTbl2"),
+    MMTBLPART("mmTblPart","p");
+
+    final String name;
+    final String partitionColumns;
+    @Override
+    public String toString() {
+      return name;
+    }
+    String getPartitionColumns() {
+      return partitionColumns;
+    }
+    TableExtended(String name) {
+      this(name, null);
+    }
+    TableExtended(String name, String partitionColumns) {
+      this.name = name;
+      this.partitionColumns = partitionColumns;
+    }
+  }
+  
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUpInternal();
+    setUpInternalExtended(false);
+  }
+  
+  void setUpInternalExtended(boolean isOrcFormat) throws Exception {
+    hiveConf.setBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING, true);
+    hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+    hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
+    hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "true");
+    hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+
+    runStatementOnDriver("create table " + TableExtended.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')");
+    if (!isOrcFormat) {
+      runStatementOnDriver("create table " + TableExtended.MMTBL + "(a int, b int) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
+      runStatementOnDriver("create table " + TableExtended.MMTBL2 + "(a int, b int) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
+      runStatementOnDriver("create table " + TableExtended.MMTBLPART + "(a int, b int) partitioned by (p string) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
+    } else {
+      runStatementOnDriver("create table " + TableExtended.MMTBL + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
+      runStatementOnDriver("create table " + TableExtended.MMTBL2 + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
+      runStatementOnDriver("create table " + TableExtended.MMTBLPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
+    }
+  }
+  protected void dropTables() throws Exception {
+    super.dropTables();
+    for(TestTxnCommandsForMmTable.TableExtended t : TestTxnCommandsForMmTable.TableExtended.values()) {
+      runStatementOnDriver("drop table if exists " + t);
+    }
+  }
+  /**
+   * Test compaction for Micro-managed table
+   * 1. Regular compaction shouldn't impact any valid subdirectories of MM tables
+   * 2. Compactions will only remove subdirectories for aborted transactions of MM tables, if any
+   * @throws Exception
+   */
+  @Test
+  public void testMmTableCompaction() throws Exception {
+    // 1. Insert some rows into MM table
+    runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)");
+    runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)");
+    // There should be 2 delta directories
+    verifyDirAndResult(2);
+
+    // 2. Perform a MINOR compaction. Since nothing was aborted, subdirs should stay.
+    runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'");
+    runWorker(hiveConf);
+    verifyDirAndResult(2);
+
+    // 3. Let a transaction be aborted
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+    runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(5,6)");
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+    // There should be 3 delta directories. The new one is the aborted one.
+    verifyDirAndResult(3);
+
+    // 4. Perform a MINOR compaction again. This time it will remove the subdir for aborted transaction.
+    runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'");
+    runWorker(hiveConf);
+    // The worker should remove the subdir for aborted transaction
+    verifyDirAndResult(2);
+
+    // 5. Run Cleaner. Shouldn't impact anything.
+    runCleaner(hiveConf);
+    verifyDirAndResult(2);
+  }
+
+  /**
+   * Test a scenario, on a micro-managed table, where an IOW comes in
+   * after a MAJOR compaction, and then a MINOR compaction is initiated.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testInsertOverwriteForMmTable() throws Exception {
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status;
+
+    // 1. Insert two rows to an MM table
+    runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)");
+    runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 delta dirs in the location
+    Assert.assertEquals(2, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
+    }
+
+    // 2. Perform a major compaction.
+    runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MAJOR'");
+    runWorker(hiveConf);
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 delta dirs.
+    Assert.assertEquals(2, status.length);
+    boolean sawBase = false;
+    int deltaCount = 0;
+    for (int i = 0; i < status.length; i++) {
+      String dirName = status[i].getPath().getName();
+      if (dirName.matches("delta_.*")) {
+        deltaCount++;
+      } else {
+        sawBase = true;
+        Assert.assertTrue(dirName.matches("base_.*"));
+      }
+    }
+    Assert.assertEquals(2, deltaCount);
+    Assert.assertFalse(sawBase);
+    // Verify query result
+    int [][] resultData = new int[][] {{1,2},{3,4}};
+    List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
+    Assert.assertEquals(stringifyValues(resultData), rs);
+
+    // 3. INSERT OVERWRITE
+    // Prepare data for the source table
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(5,6),(7,8)");
+    // Insert overwrite MM table from source table
+    runStatementOnDriver("insert overwrite table " + TableExtended.MMTBL + " select a,b from " + Table.NONACIDORCTBL);
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 delta dirs, plus 1 base dir in the location
+    Assert.assertEquals(3, status.length);
+    int baseCount = 0;
+    deltaCount = 0;
+    for (int i = 0; i < status.length; i++) {
+      String dirName = status[i].getPath().getName();
+      if (dirName.matches("delta_.*")) {
+        deltaCount++;
+      } else {
+        baseCount++;
+      }
+    }
+    Assert.assertEquals(2, deltaCount);
+    Assert.assertEquals(1, baseCount);
+
+    // Verify query result
+    resultData = new int[][] {{5,6},{7,8}};
+    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
+    Assert.assertEquals(stringifyValues(resultData), rs);
+
+    // 4. Perform a minor compaction. Nothing should change.
+    // Both deltas and the base dir should have the same name.
+    // Re-verify directory layout and query result by using the same logic as above
+    runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'");
+    runWorker(hiveConf);
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 delta dirs, plus 1 base dir in the location
+    Assert.assertEquals(3, status.length);
+    baseCount = 0;
+    deltaCount = 0;
+    for (int i = 0; i < status.length; i++) {
+      String dirName = status[i].getPath().getName();
+      if (dirName.matches("delta_.*")) {
+        deltaCount++;
+      } else {
+        Assert.assertTrue(dirName.matches("base_.*"));
+        baseCount++;
+      }
+    }
+    Assert.assertEquals(2, deltaCount);
+    Assert.assertEquals(1, baseCount);
+
+    // Verify query result
+    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
+    Assert.assertEquals(stringifyValues(resultData), rs);
+
+    // 5. Run Cleaner. It should remove the 2 delta dirs.
+    runCleaner(hiveConf);
+    // There should be only 1 directory left: base_xxxxxxx.
+    // The delta dirs should have been cleaned up.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    Assert.assertEquals(1, status.length);
+    Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
+    // Verify query result
+    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
+    Assert.assertEquals(stringifyValues(resultData), rs);
+  }
+
+  /**
+   * Test a scenario, on a partitioned micro-managed table, that an IOW comes in
+   * before a MAJOR compaction happens.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testInsertOverwriteForPartitionedMmTable() throws Exception {
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status;
+
+    // 1. Insert two rows to a partitioned MM table.
+    int[][] valuesOdd = {{5,6},{7,8}};
+    int[][] valuesEven = {{2,1},{4,3}};
+    runStatementOnDriver("insert into " + TableExtended.MMTBLPART + " PARTITION(p='odd') " + makeValuesClause(valuesOdd));
+    runStatementOnDriver("insert into " + TableExtended.MMTBLPART + " PARTITION(p='even') " + makeValuesClause(valuesEven));
+
+    // Verify dirs
+    String[] pStrings = {"/p=odd", "/p=even"};
+
+    for(int i=0; i < pStrings.length; i++) {
+      status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+          (TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[i]), FileUtils.STAGING_DIR_PATH_FILTER);
+      // There should be 1 delta dir per partition location
+      Assert.assertEquals(1, status.length);
+      Assert.assertTrue(status[0].getPath().getName().matches("delta_.*"));
+    }
+
+    // 2. INSERT OVERWRITE
+    // Prepare data for the source table
+    int[][] newValsOdd = {{5,5},{11,11}};
+    int[][] newValsEven = {{2,2}};
+
+    runStatementOnDriver("insert into " + TableExtended.NONACIDPART + " PARTITION(p='odd') " + makeValuesClause(newValsOdd));
+    runStatementOnDriver("insert into " + TableExtended.NONACIDPART + " PARTITION(p='even') " + makeValuesClause(newValsEven));
+
+    // Insert overwrite MM table from source table
+    List<String> rs = null;
+    String s = "insert overwrite table " + TableExtended.MMTBLPART + " PARTITION(p='odd') " +
+      " select a,b from " + TableExtended.NONACIDPART + " where " + TableExtended.NONACIDPART + ".p='odd'";
+    rs = runStatementOnDriver("explain formatted " + s);
+    LOG.info("Explain formatted: " + rs.toString());
+    runStatementOnDriver(s);
+
+    s = "insert overwrite table " + TableExtended.MMTBLPART + " PARTITION(p='even') " +
+        " select a,b from " + TableExtended.NONACIDPART + " where " + TableExtended.NONACIDPART + ".p='even'";
+    runStatementOnDriver(s);
+
+    // Verify resulting dirs.
+    boolean sawBase = false;
+    String[] baseDirs = {"", ""};
+    int deltaCount = 0;
+    for(int h=0; h < pStrings.length; h++) {
+      status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+          (TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER);
+      // There should be 1 delta dir, plus a base dir in the location
+      Assert.assertEquals(2, status.length);
+      for (int i = 0; i < status.length; i++) {
+        String dirName = status[i].getPath().getName();
+        if (dirName.matches("delta_.*")) {
+          deltaCount++;
+        } else {
+          sawBase = true;
+          baseDirs[h] = dirName;
+          Assert.assertTrue(baseDirs[i].matches("base_.*"));
+        }
+      }
+      Assert.assertEquals(1, deltaCount);
+      Assert.assertTrue(sawBase);
+      deltaCount = 0;
+      sawBase = false;
+    }
+
+    // Verify query result
+    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " where p='even' order by a,b");
+    int [][] rExpectedEven = new int[][] {{2,2}};
+    Assert.assertEquals(stringifyValues(rExpectedEven), rs);
+
+    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " where p='odd' order by a,b");
+    int [][] rExpectedOdd  = new int[][] {{5,5},{11,11}};
+    Assert.assertEquals(stringifyValues(rExpectedOdd), rs);
+
+    // 3. Perform a major compaction. Nothing should change.
+    // Both deltas and base dirs should have the same name.
+    // Re-verify directory layout and query result by using the same logic as above
+    runStatementOnDriver("alter table "+ TableExtended.MMTBLPART + " PARTITION(p='odd') " + " compact 'MAJOR'" );
+    runWorker(hiveConf);
+    runStatementOnDriver("alter table "+ TableExtended.MMTBLPART + " PARTITION(p='even') " + " compact 'MAJOR'" );
+    runWorker(hiveConf);
+
+    for(int h=0; h < pStrings.length; h++) {
+      status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+          (TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER);
+      // There should be 2 delta dirs, plus a base dir in the location
+      Assert.assertEquals(2, status.length);
+      sawBase = false;
+      deltaCount = 0;
+      for (int i = 0; i < status.length; i++) {
+        String dirName = status[i].getPath().getName();
+        if (dirName.matches("delta_.*")) {
+          deltaCount++;
+        } else {
+          sawBase = true;
+          Assert.assertTrue("BASE ERROR: " + dirName, dirName.matches("base_.*"));
+          Assert.assertEquals(baseDirs[h], dirName);
+        }
+      }
+      Assert.assertEquals(1, deltaCount);
+      Assert.assertTrue(sawBase);
+      deltaCount = 0;
+      sawBase = false;
+    }
+
+    // Verify query result
+    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " order by a,b");
+    int[][] rExpected = new int[][] {{2,2},{5,5},{11,11}};
+    Assert.assertEquals(stringifyValues(rExpected), rs);
+
+    // 4. Run Cleaner. It should remove the 2 delta dirs.
+    runCleaner(hiveConf);
+
+    // There should be only 1 directory left: base_xxxxxxx.
+    // The delta dirs should have been cleaned up.
+    for(int h=0; h < pStrings.length; h++) {
+      status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+          (TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER);
+      Assert.assertEquals(1, status.length);
+      Assert.assertTrue(status[0].getPath().getName().matches("base_.*"));
+      Assert.assertEquals(baseDirs[h], status[0].getPath().getName());
+    }
+    // Verify query result
+    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " order by a,b");
+    Assert.assertEquals(stringifyValues(rExpected), rs);
+  }
+
+  /**
+   * Test a scenario, on a dynamically partitioned micro-managed table, that an IOW comes in
+   * before a MAJOR compaction happens.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testInsertOverwriteWithDynamicPartition() throws Exception {
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status;
+
+    // 1. Insert two rows to a partitioned MM table.
+    int[][] valuesOdd = {{5,6},{7,8}};
+    int[][] valuesEven = {{2,1},{4,3}};
+    runStatementOnDriver("insert into " + TableExtended.MMTBLPART + " PARTITION(p='odd') " + makeValuesClause(valuesOdd));
+    runStatementOnDriver("insert into " + TableExtended.MMTBLPART + " PARTITION(p='even') " + makeValuesClause(valuesEven));
+
+    // Verify dirs
+    String[] pStrings = {"/p=odd", "/p=even"};
+
+    for(int i=0; i < pStrings.length; i++) {
+      status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+          (TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[i]), FileUtils.STAGING_DIR_PATH_FILTER);
+      // There should be 1 delta dir per partition location
+      Assert.assertEquals(1, status.length);
+      Assert.assertTrue(status[0].getPath().getName().matches("delta_.*"));
+    }
+
+    // 2. INSERT OVERWRITE
+    // Prepare data for the source table
+    int[][] newValsOdd = {{5,5},{11,11}};
+    int[][] newValsEven = {{2,2}};
+
+    runStatementOnDriver("insert into " + TableExtended.NONACIDPART + " PARTITION(p='odd') " + makeValuesClause(newValsOdd));
+    runStatementOnDriver("insert into " + TableExtended.NONACIDPART + " PARTITION(p='even') " + makeValuesClause(newValsEven));
+
+    runStatementOnDriver("insert overwrite table " + TableExtended.MMTBLPART + " partition(p) select a,b,p from " + TableExtended.NONACIDPART);
+
+    // Verify resulting dirs.
+    boolean sawBase = false;
+    String[] baseDirs = {"", ""};
+    int deltaCount = 0;
+    for(int h=0; h < pStrings.length; h++) {
+      status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+          (TableExtended.MMTBLPART).toString().toLowerCase() + pStrings[h]), FileUtils.STAGING_DIR_PATH_FILTER);
+      // There should be 1 delta dir, plus a base dir in the location
+      Assert.assertEquals(2, status.length);   // steve
+
+      for (int i = 0; i < status.length; i++) {
+        String dirName = status[i].getPath().getName();
+        if (dirName.matches("delta_.*")) {
+          deltaCount++;
+        } else {
+          sawBase = true;
+          baseDirs[h] = dirName;
+          Assert.assertTrue(baseDirs[h].matches("base_.*"));
+        }
+      }
+      Assert.assertEquals(1, deltaCount);
+      Assert.assertTrue(sawBase);
+      deltaCount = 0;
+      sawBase = false;
+    }
+
+    // Verify query result
+    List<String> rs = null;
+    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " where p='even' order by a,b");
+    int [][] rExpectedEven = new int[][] {{2,2}};
+    Assert.assertEquals(stringifyValues(rExpectedEven), rs);
+
+    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " where p='odd' order by a,b");
+    int [][] rExpectedOdd  = new int[][] {{5,5},{11,11}};
+    Assert.assertEquals(stringifyValues(rExpectedOdd), rs);
+
+    // Verify query result
+    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBLPART + " order by a,b");
+    int[][] rExpected = new int[][] {{2,2},{5,5},{11,11}};
+    Assert.assertEquals(stringifyValues(rExpected), rs);
+  }
+
+  @Test
+  public void testInsertOverwriteWithUnionAll() throws Exception {
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] status;
+
+    // 1. Insert two rows to an MM table
+    runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)");
+    runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)");
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 delta dirs in the location
+    Assert.assertEquals(2, status.length);
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
+    }
+
+    // 2. Insert Overwrite.
+    int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}};
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + TestTxnCommands2.makeValuesClause(values));
+
+    runStatementOnDriver("insert overwrite table " + TableExtended.MMTBL + " select a,b from " + Table.NONACIDORCTBL + " where a between 1 and 3 union all select a,b from " + Table.NONACIDORCTBL + " where a between 5 and 7");
+
+    // Verify resulting dirs.
+    boolean sawBase = false;
+    String baseDir = "";
+    int deltaCount = 0;
+
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+      (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be 2 delta dirs, plus a base dir in the location
+    Assert.assertEquals(3, status.length);
+
+    for (int i = 0; i < status.length; i++) {
+      String dirName = status[i].getPath().getName();
+      if (dirName.matches("delta_.*")) {
+        deltaCount++;
+      } else {
+        sawBase = true;
+        baseDir = dirName;
+        Assert.assertTrue(baseDir.matches("base_.*"));
+      }
+    }
+    Assert.assertEquals(2, deltaCount);
+    Assert.assertTrue(sawBase);
+
+    List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
+    int[][] rExpected = new int[][] {{1,2},{2,4},{5,6},{6,8}};
+    Assert.assertEquals(stringifyValues(rExpected), rs);
+
+    // 4. Perform a major compaction.
+    runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MAJOR'");
+    runWorker(hiveConf);
+
+    // 5. Run Cleaner. It should remove the 2 delta dirs.
+    runCleaner(hiveConf);
+
+    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
+    Assert.assertEquals(stringifyValues(rExpected), rs);
+
+    // Verify resulting dirs.
+    status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+      (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    // There should be one base dir in the location
+    Assert.assertEquals(1, status.length);
+
+    sawBase = false;
+    deltaCount = 0;
+    for (int i = 0; i < status.length; i++) {
+      String dirName = status[i].getPath().getName();
+      if (dirName.matches("delta_.*")) {
+        deltaCount++;
+      } else {
+        sawBase = true;
+        baseDir = dirName;
+        Assert.assertTrue(baseDir.matches("base_.*"));
+      }
+    }
+    Assert.assertEquals(0, deltaCount);
+    Assert.assertTrue(sawBase);
+
+    rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
+    Assert.assertEquals(stringifyValues(rExpected), rs);
+  }
+
+  private void verifyDirAndResult(int expectedDeltas) throws Exception {
+    FileSystem fs = FileSystem.get(hiveConf);
+    // Verify the content of subdirs
+    FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (TableExtended.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    int sawDeltaTimes = 0;
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
+      sawDeltaTimes++;
+      FileStatus[] files = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+      Assert.assertEquals(1, files.length);
+      Assert.assertTrue(files[0].getPath().getName().equals("000000_0"));
+    }
+    Assert.assertEquals(expectedDeltas, sawDeltaTimes);
+
+    // Verify query result
+    int [][] resultData = new int[][] {{1,2}, {3,4}};
+    List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
+    Assert.assertEquals(stringifyValues(resultData), rs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForOrcMmTable.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForOrcMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForOrcMmTable.java
new file mode 100644
index 0000000..1a70175
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForOrcMmTable.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Same as TestTxnCommands2 but tests ACID tables with vectorization turned on by
+ * default, and having 'transactional_properties' set to 'default'. This specifically tests the
+ * fast VectorizedOrcAcidRowBatchReader for ACID tables with split-update turned on.
+ */
+public class TestTxnCommandsForOrcMmTable extends TestTxnCommandsForMmTable {
+
+  public TestTxnCommandsForOrcMmTable() {
+    super();
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    setUpInternal();
+    setUpInternalExtended(true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 6a2164f..32a2489 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -70,7 +71,7 @@ public abstract class TxnCommandsBaseForTests {
   public void setUp() throws Exception {
     setUpInternal();
   }
-  void setUpInternal() throws Exception {
+  protected void setUpInternal() throws Exception {
     hiveConf = new HiveConf(this.getClass());
     hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
     hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
@@ -101,7 +102,7 @@ public abstract class TxnCommandsBaseForTests {
     runStatementOnDriver("create temporary  table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + "(a int, b int) stored as orc");
   }
-  private void dropTables() throws Exception {
+  protected void dropTables() throws Exception {
     for(TxnCommandsBaseForTests.Table t : TxnCommandsBaseForTests.Table.values()) {
       runStatementOnDriver("drop table if exists " + t);
     }
@@ -134,6 +135,14 @@ public abstract class TxnCommandsBaseForTests {
   String makeValuesClause(int[][] rows) {
     return TestTxnCommands2.makeValuesClause(rows);
   }
+  
+  void runWorker(HiveConf hiveConf) throws MetaException {
+    TestTxnCommands2.runWorker(hiveConf);
+  }
+
+  void runCleaner(HiveConf hiveConf) throws MetaException {
+    TestTxnCommands2.runCleaner(hiveConf);
+  }
 
   List<String> runStatementOnDriver(String stmt) throws Exception {
     CommandProcessorResponse cpr = d.run(stmt);

http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
index 9a22c54..61f5d1a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
@@ -218,7 +218,7 @@ public class TestUtilities {
     Path tempDirPath = setupTempDirWithSingleOutputFile(hconf);
     FileSinkDesc conf = getFileSinkDesc(tempDirPath);
 
-    List<Path> paths = Utilities.removeTempOrDuplicateFiles(localFs, tempDirPath, dpCtx, conf, hconf);
+    List<Path> paths = Utilities.removeTempOrDuplicateFiles(localFs, tempDirPath, dpCtx, conf, hconf, false);
 
     String expectedScheme = tempDirPath.toUri().getScheme();
     String expectedAuthority = tempDirPath.toUri().getAuthority();

http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/ql/src/test/results/clientpositive/llap/mm_all.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_all.q.out b/ql/src/test/results/clientpositive/llap/mm_all.q.out
index cfbe659..03c1293 100644
--- a/ql/src/test/results/clientpositive/llap/mm_all.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_all.q.out
@@ -1365,6 +1365,12 @@ POSTHOOK: query: select * from multi0_2_mm order by key, key2
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@multi0_2_mm
 #### A masked pattern was here ####
+455	97
+455	98
+456	0
+456	10
+457	100
+457	103
 PREHOOK: query: from intermediate
 insert into table multi0_1_mm select p, key
 insert overwrite table multi0_2_mm select key, p
@@ -1417,6 +1423,12 @@ POSTHOOK: query: select * from multi0_2_mm order by key, key2
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@multi0_2_mm
 #### A masked pattern was here ####
+0	456
+10	456
+97	455
+98	455
+100	457
+103	457
 PREHOOK: query: drop table multi0_1_mm
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@multi0_1_mm
@@ -1541,17 +1553,11 @@ POSTHOOK: Input: default@multi1_mm@p=2
 100	457	2
 103	457	1
 103	457	2
-455	97	1
 455	97	2
-455	98	1
 455	98	2
-456	0	1
 456	0	2
-456	10	1
 456	10	2
-457	100	1
 457	100	2
-457	103	1
 457	103	2
 PREHOOK: query: from intermediate
 insert into table multi1_mm partition(p) select p, key, p
@@ -1621,22 +1627,16 @@ POSTHOOK: Input: default@multi1_mm@p=457
 103	457	1
 103	457	1
 103	457	2
-455	97	1
 455	97	2
 455	97	455
-455	98	1
 455	98	2
 455	98	455
-456	0	1
 456	0	2
 456	0	456
-456	10	1
 456	10	2
 456	10	456
-457	100	1
 457	100	2
 457	100	457
-457	103	1
 457	103	2
 457	103	457
 PREHOOK: query: from intermediate
@@ -1705,27 +1705,21 @@ POSTHOOK: Input: default@multi1_mm@p=457
 103	457	1
 103	457	2
 455	97	1
-455	97	1
 455	97	2
 455	97	455
 455	98	1
-455	98	1
 455	98	2
 455	98	455
 456	0	1
-456	0	1
 456	0	2
 456	0	456
 456	10	1
-456	10	1
 456	10	2
 456	10	456
 457	100	1
-457	100	1
 457	100	2
 457	100	457
 457	103	1
-457	103	1
 457	103	2
 457	103	457
 PREHOOK: query: drop table multi1_mm

http://git-wip-us.apache.org/repos/asf/hive/blob/5f12cb84/ql/src/test/results/clientpositive/mm_all.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/mm_all.q.out b/ql/src/test/results/clientpositive/mm_all.q.out
index 5ad5957..490c67f 100644
--- a/ql/src/test/results/clientpositive/mm_all.q.out
+++ b/ql/src/test/results/clientpositive/mm_all.q.out
@@ -1384,6 +1384,12 @@ POSTHOOK: query: select * from multi0_2_mm order by key, key2
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@multi0_2_mm
 #### A masked pattern was here ####
+455	97
+455	98
+456	0
+456	10
+457	100
+457	103
 PREHOOK: query: from intermediate
 insert into table multi0_1_mm select p, key
 insert overwrite table multi0_2_mm select key, p
@@ -1436,6 +1442,12 @@ POSTHOOK: query: select * from multi0_2_mm order by key, key2
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@multi0_2_mm
 #### A masked pattern was here ####
+0	456
+10	456
+97	455
+98	455
+100	457
+103	457
 PREHOOK: query: drop table multi0_1_mm
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@multi0_1_mm
@@ -1560,17 +1572,11 @@ POSTHOOK: Input: default@multi1_mm@p=2
 100	457	2
 103	457	1
 103	457	2
-455	97	1
 455	97	2
-455	98	1
 455	98	2
-456	0	1
 456	0	2
-456	10	1
 456	10	2
-457	100	1
 457	100	2
-457	103	1
 457	103	2
 PREHOOK: query: from intermediate
 insert into table multi1_mm partition(p) select p, key, p
@@ -1640,22 +1646,16 @@ POSTHOOK: Input: default@multi1_mm@p=457
 103	457	1
 103	457	1
 103	457	2
-455	97	1
 455	97	2
 455	97	455
-455	98	1
 455	98	2
 455	98	455
-456	0	1
 456	0	2
 456	0	456
-456	10	1
 456	10	2
 456	10	456
-457	100	1
 457	100	2
 457	100	457
-457	103	1
 457	103	2
 457	103	457
 PREHOOK: query: from intermediate
@@ -1724,27 +1724,21 @@ POSTHOOK: Input: default@multi1_mm@p=457
 103	457	1
 103	457	2
 455	97	1
-455	97	1
 455	97	2
 455	97	455
 455	98	1
-455	98	1
 455	98	2
 455	98	455
 456	0	1
-456	0	1
 456	0	2
 456	0	456
 456	10	1
-456	10	1
 456	10	2
 456	10	456
 457	100	1
-457	100	1
 457	100	2
 457	100	457
 457	103	1
-457	103	1
 457	103	2
 457	103	457
 PREHOOK: query: drop table multi1_mm


Mime
View raw message