hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [50/50] [abbrv] hive git commit: HIVE-14635 : establish a separate path for FSOP to write into final path (Sergey Shelukhin)
Date Mon, 29 Aug 2016 18:36:22 GMT
HIVE-14635 : establish a separate path for FSOP to write into final path (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d0f5b893
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d0f5b893
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d0f5b893

Branch: refs/heads/hive-14535
Commit: d0f5b893a8f3448882f18cca7cd2fec02c708874
Parents: c97450c
Author: Sergey Shelukhin <sershe@apache.org>
Authored: Mon Aug 29 11:34:42 2016 -0700
Committer: Sergey Shelukhin <sershe@apache.org>
Committed: Mon Aug 29 11:34:42 2016 -0700

----------------------------------------------------------------------
 .../metastore/api/hive_metastoreConstants.java  |  30 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   | 127 +++++---
 .../hadoop/hive/ql/exec/JoinOperator.java       |   1 +
 .../apache/hadoop/hive/ql/exec/MoveTask.java    | 126 ++++----
 .../apache/hadoop/hive/ql/exec/Utilities.java   |  17 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  98 +++++-
 .../hive/ql/optimizer/GenMapRedUtils.java       |   4 +
 .../optimizer/unionproc/UnionProcFactory.java   |   3 +
 .../hadoop/hive/ql/parse/GenTezProcContext.java |   2 +-
 .../hadoop/hive/ql/parse/GenTezUtils.java       |   4 +
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  | 311 +++++++++++--------
 .../ql/plan/ConditionalResolverMergeFiles.java  |   5 +
 .../hadoop/hive/ql/plan/FileSinkDesc.java       |  11 +-
 .../hadoop/hive/ql/plan/LoadFileDesc.java       |   2 +
 .../hadoop/hive/ql/plan/LoadTableDesc.java      |  24 +-
 .../apache/hadoop/hive/ql/plan/MoveWork.java    |   1 +
 .../hive/ql/exec/TestFileSinkOperator.java      |   2 +-
 ql/src/test/queries/clientpositive/mm_all.q     |  63 ++++
 ql/src/test/queries/clientpositive/mm_current.q |  11 +
 .../clientpositive/llap/mm_current.q.out        |  21 ++
 20 files changed, 567 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
index 8de8896..6a5f550 100644
--- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
+++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/hive_metastoreConstants.java
@@ -6,34 +6,7 @@
  */
 package org.apache.hadoop.hive.metastore.api;
 
-import org.apache.thrift.scheme.IScheme;
-import org.apache.thrift.scheme.SchemeFactory;
-import org.apache.thrift.scheme.StandardScheme;
-
-import org.apache.thrift.scheme.TupleScheme;
-import org.apache.thrift.protocol.TTupleProtocol;
-import org.apache.thrift.protocol.TProtocolException;
-import org.apache.thrift.EncodingUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.server.AbstractNonblockingServer.*;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.EnumMap;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.EnumSet;
-import java.util.Collections;
-import java.util.BitSet;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import javax.annotation.Generated;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+
 public class hive_metastoreConstants {
 
   public static final String DDL_TIME = "transient_lastDdlTime";
@@ -84,4 +57,5 @@ public class hive_metastoreConstants {
 
   public static final String TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties";
 
+  public static final String TABLE_IS_MM = "hivecommit";
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index b0c3d3f..755120f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -143,8 +143,8 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
   }
 
   public class FSPaths implements Cloneable {
-    Path tmpPath;
-    Path taskOutputTempPath;
+    private Path tmpPath;
+    private Path taskOutputTempPath;
     Path[] outPaths;
     Path[] finalPaths;
     RecordWriter[] outWriters;
@@ -152,10 +152,21 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     Stat stat;
     int acidLastBucket = -1;
     int acidFileOffset = -1;
+    private boolean isMmTable;
+
+    public FSPaths(Path specPath, boolean isMmTable) {
+      this.isMmTable = isMmTable;
+      if (!isMmTable) {
+        tmpPath = Utilities.toTempPath(specPath);
+        taskOutputTempPath = Utilities.toTaskTempPath(specPath);
+      } else {
+        tmpPath = specPath;
+        taskOutputTempPath = null; // Should not be used.
+      } 
+      Utilities.LOG14535.info("new FSPaths for " + numFiles + " files, dynParts = " + bDynParts
+          + ": tmpPath " + tmpPath + ", task path " + taskOutputTempPath
+          + " (spec path " + specPath + ")", new Exception());
 
-    public FSPaths(Path specPath) {
-      tmpPath = Utilities.toTempPath(specPath);
-      taskOutputTempPath = Utilities.toTaskTempPath(specPath);
       outPaths = new Path[numFiles];
       finalPaths = new Path[numFiles];
       outWriters = new RecordWriter[numFiles];
@@ -207,10 +218,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     }
 
     private void commit(FileSystem fs) throws HiveException {
+      if (isMmTable) return;  // TODO#: need to propagate to MoveTask instead
       for (int idx = 0; idx < outPaths.length; ++idx) {
         try {
           if ((bDynParts || isSkewedStoredAsSubDirectories)
               && !fs.exists(finalPaths[idx].getParent())) {
+            Utilities.LOG14535.info("commit making path for dyn/skew: " + finalPaths[idx].getParent());
             fs.mkdirs(finalPaths[idx].getParent());
           }
           boolean needToRename = true;
@@ -229,6 +242,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
               needToRename = false;
             }
           }
+          Utilities.LOG14535.info("commit potentially moving " + outPaths[idx] + " to " + finalPaths[idx]);
           if (needToRename && outPaths[idx] != null && !fs.rename(outPaths[idx], finalPaths[idx])) {
             throw new HiveException("Unable to rename output from: " +
                 outPaths[idx] + " to: " + finalPaths[idx]);
@@ -260,6 +274,54 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     public Stat getStat() {
       return stat;
     }
+
+    public void configureDynPartPath(String dirName, String childSpecPathDynLinkedPartitions) {
+      dirName = (childSpecPathDynLinkedPartitions == null) ? dirName :
+        dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions;
+      tmpPath = new Path(tmpPath, dirName);
+      if (taskOutputTempPath != null) {
+        taskOutputTempPath = new Path(taskOutputTempPath, dirName);
+      }
+    }
+
+    public void initializeBucketPaths(int filesIdx, String taskId, boolean isNativeTable,
+        boolean isSkewedStoredAsSubDirectories) {
+      if (isNativeTable) {
+        String extension = Utilities.getFileExtension(jc, isCompressed, hiveOutputFormat);
+        if (!isMmTable) {
+          if (!bDynParts && !isSkewedStoredAsSubDirectories) {
+            finalPaths[filesIdx] = getFinalPath(taskId, parent, extension);
+          } else {
+            finalPaths[filesIdx] = getFinalPath(taskId, tmpPath, extension);
+          }
+          outPaths[filesIdx] = getTaskOutPath(taskId);
+        } else {
+          if (!bDynParts && !isSkewedStoredAsSubDirectories) {
+            finalPaths[filesIdx] = getFinalPath(taskId, specPath, extension);
+          } else {
+            // TODO# wrong!
+            finalPaths[filesIdx] = getFinalPath(taskId, specPath, extension);
+          }
+          outPaths[filesIdx] = finalPaths[filesIdx];
+        }
+        if (isInfoEnabled) {
+          LOG.info("Final Path: FS " + finalPaths[filesIdx]);
+          if (isInfoEnabled && !isMmTable) {
+            LOG.info("Writing to temp file: FS " + outPaths[filesIdx]);
+          }
+        }
+      } else {
+        finalPaths[filesIdx] = outPaths[filesIdx] = specPath;
+      }
+    }
+
+    public Path getTmpPath() {
+      return tmpPath;
+    }
+
+    public Path getTaskOutputTempPath() {
+      return taskOutputTempPath;
+    }
   } // class FSPaths
 
   private static final long serialVersionUID = 1L;
@@ -297,6 +359,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
   protected boolean filesCreated = false;
 
   private void initializeSpecPath() {
+    // TODO# special case #N
     // For a query of the type:
     // insert overwrite table T1
     // select * from (subq1 union all subq2)u;
@@ -397,7 +460,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       }
 
       if (!bDynParts) {
-        fsp = new FSPaths(specPath);
+        fsp = new FSPaths(specPath, conf.isMmTable());
 
         // Create all the files - this is required because empty files need to be created for
         // empty buckets
@@ -411,6 +474,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
                                             .getVar(hconf, HIVE_TEMPORARY_TABLE_STORAGE));
       if (isTemporary && fsp != null
           && tmpStorage != StoragePolicyValue.DEFAULT) {
+        assert !conf.isMmTable(); // Not supported for temp tables.
         final Path outputPath = fsp.taskOutputTempPath;
         StoragePolicyShim shim = ShimLoader.getHadoopShims()
             .getStoragePolicyShim(fs);
@@ -557,7 +621,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       assert filesIdx == numFiles;
 
       // in recent hadoop versions, use deleteOnExit to clean tmp files.
-      if (isNativeTable && fs != null && fsp != null) {
+      if (isNativeTable && fs != null && fsp != null && !conf.isMmTable()) {
         autoDelete = fs.deleteOnExit(fsp.outPaths[0]);
       }
     } catch (Exception e) {
@@ -571,34 +635,16 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
   protected void createBucketForFileIdx(FSPaths fsp, int filesIdx)
       throws HiveException {
     try {
-      if (isNativeTable) {
-        fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, null);
-        if (isInfoEnabled) {
-          LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]);
-        }
-        fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId);
-        if (isInfoEnabled) {
-          LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]);
-        }
-      } else {
-        fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath;
-      }
-      // The reason to keep these instead of using
-      // OutputFormat.getRecordWriter() is that
-      // getRecordWriter does not give us enough control over the file name that
-      // we create.
-      String extension = Utilities.getFileExtension(jc, isCompressed, hiveOutputFormat);
-      if (!bDynParts && !this.isSkewedStoredAsSubDirectories) {
-        fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, parent, extension);
-      } else {
-        fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, extension);
-      }
+      fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable, isSkewedStoredAsSubDirectories);
+      Utilities.LOG14535.info("createBucketForFileIdx " + filesIdx + ": final path " + fsp.finalPaths[filesIdx]
+          + "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath + ", tmp path "
+          + fsp.getTmpPath() + ", task " + taskId + ")", new Exception());
 
       if (isInfoEnabled) {
         LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]);
       }
 
-      if (isNativeTable) {
+      if (isNativeTable && !conf.isMmTable()) {
         // in recent hadoop versions, use deleteOnExit to clean tmp files.
         autoDelete = fs.deleteOnExit(fsp.outPaths[filesIdx]);
       }
@@ -828,6 +874,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
   protected FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException {
     FSPaths fsp2 = valToPaths.get(lbDirName);
     if (fsp2 == null) {
+      Utilities.LOG14535.info("lookupListBucketingPaths for " + lbDirName);
       fsp2 = createNewPaths(lbDirName);
     }
     return fsp2;
@@ -841,18 +888,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
    * @throws HiveException
    */
   private FSPaths createNewPaths(String dirName) throws HiveException {
-    FSPaths fsp2 = new FSPaths(specPath);
-    if (childSpecPathDynLinkedPartitions != null) {
-      fsp2.tmpPath = new Path(fsp2.tmpPath,
-          dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions);
-      fsp2.taskOutputTempPath =
-        new Path(fsp2.taskOutputTempPath,
-            dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions);
-    } else {
-      fsp2.tmpPath = new Path(fsp2.tmpPath, dirName);
-      fsp2.taskOutputTempPath =
-        new Path(fsp2.taskOutputTempPath, dirName);
-    }
+    FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable()); // TODO# this will break
+    fsp2.configureDynPartPath(dirName, childSpecPathDynLinkedPartitions);
+    Utilities.LOG14535.info("creating new paths for " + dirName + ", childSpec " + childSpecPathDynLinkedPartitions
+        + ": tmpPath " + fsp2.getTmpPath() + ", task path " + fsp2.getTaskOutputTempPath());
     if(!conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
       createBucketFiles(fsp2);
       valToPaths.put(dirName, fsp2);
@@ -1082,7 +1121,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       // Hadoop always call close() even if an Exception was thrown in map() or
       // reduce().
       for (FSPaths fsp : valToPaths.values()) {
-        fsp.abortWriters(fs, abort, !autoDelete && isNativeTable);
+        fsp.abortWriters(fs, abort, !autoDelete && isNativeTable && !conf.isMmTable());
       }
     }
     fsp = prevFsp = null;
@@ -1193,7 +1232,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     for (Map.Entry<String, FSPaths> entry : valToPaths.entrySet()) {
       String fspKey = entry.getKey();     // DP/LB
       FSPaths fspValue = entry.getValue();
-
+      // TODO# useful code as reference, as it takes apart the crazy paths
       // for bucketed tables, hive.optimize.sort.dynamic.partition optimization
       // adds the taskId to the fspKey.
       if (conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
index 08cc4b4..8f319ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
@@ -233,6 +233,7 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial
         // point, updates from speculative tasks still writing to tmpPath
         // will not appear in finalPath.
         log.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
+        Utilities.LOG14535.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath + "(spec " + specPath + ")");
         Utilities.rename(fs, tmpPath, intermediatePath);
         // Step2: remove any tmp file or double-committed output files
         Utilities.removeTempOrDuplicateFiles(fs, intermediatePath);

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 546919b..14a84cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -252,6 +252,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
       if (lfd != null) {
         Path targetPath = lfd.getTargetDir();
         Path sourcePath = lfd.getSourcePath();
+        Utilities.LOG14535.info("MoveTask moving LFD " + sourcePath + " to " + targetPath);
         moveFile(sourcePath, targetPath, lfd.getIsDfsDir());
       }
 
@@ -268,6 +269,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
           if (!fs.exists(destPath.getParent())) {
             fs.mkdirs(destPath.getParent());
           }
+          Utilities.LOG14535.info("MoveTask moving LMFD " + srcPath + " to " + destPath);
           moveFile(srcPath, destPath, isDfsDir);
           i++;
         }
@@ -288,71 +290,17 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
           mesg.append(')');
         }
         String mesg_detail = " from " + tbd.getSourcePath();
+        Utilities.LOG14535.info("" + mesg.toString() + " " + mesg_detail);
         console.printInfo(mesg.toString(), mesg_detail);
         Table table = db.getTable(tbd.getTable().getTableName());
 
-        if (work.getCheckFileFormat()) {
-          // Get all files from the src directory
-          FileStatus[] dirs;
-          ArrayList<FileStatus> files;
-          FileSystem srcFs; // source filesystem
-          try {
-            srcFs = tbd.getSourcePath().getFileSystem(conf);
-            dirs = srcFs.globStatus(tbd.getSourcePath());
-            files = new ArrayList<FileStatus>();
-            for (int i = 0; (dirs != null && i < dirs.length); i++) {
-              files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER)));
-              // We only check one file, so exit the loop when we have at least
-              // one.
-              if (files.size() > 0) {
-                break;
-              }
-            }
-          } catch (IOException e) {
-            throw new HiveException(
-                "addFiles: filesystem error in check phase", e);
-          }
-
-          // handle file format check for table level
-          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
-            boolean flag = true;
-            // work.checkFileFormat is set to true only for Load Task, so assumption here is
-            // dynamic partition context is null
-            if (tbd.getDPCtx() == null) {
-              if (tbd.getPartitionSpec() == null || tbd.getPartitionSpec().isEmpty()) {
-                // Check if the file format of the file matches that of the table.
-                flag = HiveFileFormatUtils.checkInputFormat(
-                    srcFs, conf, tbd.getTable().getInputFileFormatClass(), files);
-              } else {
-                // Check if the file format of the file matches that of the partition
-                Partition oldPart = db.getPartition(table, tbd.getPartitionSpec(), false);
-                if (oldPart == null) {
-                  // this means we have just created a table and are specifying partition in the
-                  // load statement (without pre-creating the partition), in which case lets use
-                  // table input format class. inheritTableSpecs defaults to true so when a new
-                  // partition is created later it will automatically inherit input format
-                  // from table object
-                  flag = HiveFileFormatUtils.checkInputFormat(
-                      srcFs, conf, tbd.getTable().getInputFileFormatClass(), files);
-                } else {
-                  flag = HiveFileFormatUtils.checkInputFormat(
-                      srcFs, conf, oldPart.getInputFormatClass(), files);
-                }
-              }
-              if (!flag) {
-                throw new HiveException(
-                    "Wrong file format. Please check the file's format.");
-              }
-            } else {
-              LOG.warn("Skipping file format check as dpCtx is not null");
-            }
-          }
-        }
+        checkFileFormats(db, tbd, table);
 
         // Create a data container
         DataContainer dc = null;
         if (tbd.getPartitionSpec().size() == 0) {
           dc = new DataContainer(table.getTTable());
+          Utilities.LOG14535.info("loadTable called from " + tbd.getSourcePath() + " into " + tbd.getTable());
           db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getReplace(),
               work.isSrcLocal(), isSkewedStoredAsDirs(tbd),
               work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID,
@@ -495,10 +443,11 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
             List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),
                 tbd.getPartitionSpec());
             db.validatePartitionNameCharacters(partVals);
+            Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath() + " into " + tbd.getTable());
             db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
                 tbd.getPartitionSpec(), tbd.getReplace(),
                 tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(),
-                work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask());
+                work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask(), tbd.isMmTable());
             Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
 
             if (bucketCols != null || sortCols != null) {
@@ -547,6 +496,67 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
     }
   }
 
+  private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table)
+      throws HiveException {
+    if (work.getCheckFileFormat()) {
+      // Get all files from the src directory
+      FileStatus[] dirs;
+      ArrayList<FileStatus> files;
+      FileSystem srcFs; // source filesystem
+      try {
+        srcFs = tbd.getSourcePath().getFileSystem(conf);
+        dirs = srcFs.globStatus(tbd.getSourcePath());
+        files = new ArrayList<FileStatus>();
+        for (int i = 0; (dirs != null && i < dirs.length); i++) {
+          files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER)));
+          // We only check one file, so exit the loop when we have at least
+          // one.
+          if (files.size() > 0) {
+            break;
+          }
+        }
+      } catch (IOException e) {
+        throw new HiveException(
+            "addFiles: filesystem error in check phase", e);
+      }
+
+      // handle file format check for table level
+      if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
+        boolean flag = true;
+        // work.checkFileFormat is set to true only for Load Task, so assumption here is
+        // dynamic partition context is null
+        if (tbd.getDPCtx() == null) {
+          if (tbd.getPartitionSpec() == null || tbd.getPartitionSpec().isEmpty()) {
+            // Check if the file format of the file matches that of the table.
+            flag = HiveFileFormatUtils.checkInputFormat(
+                srcFs, conf, tbd.getTable().getInputFileFormatClass(), files);
+          } else {
+            // Check if the file format of the file matches that of the partition
+            Partition oldPart = db.getPartition(table, tbd.getPartitionSpec(), false);
+            if (oldPart == null) {
+              // this means we have just created a table and are specifying partition in the
+              // load statement (without pre-creating the partition), in which case lets use
+              // table input format class. inheritTableSpecs defaults to true so when a new
+              // partition is created later it will automatically inherit input format
+              // from table object
+              flag = HiveFileFormatUtils.checkInputFormat(
+                  srcFs, conf, tbd.getTable().getInputFileFormatClass(), files);
+            } else {
+              flag = HiveFileFormatUtils.checkInputFormat(
+                  srcFs, conf, oldPart.getInputFormatClass(), files);
+            }
+          }
+          if (!flag) {
+            throw new HiveException(
+                "Wrong file format. Please check the file's format.");
+          }
+        } else {
+          LOG.warn("Skipping file format check as dpCtx is not null");
+        }
+      }
+    }
+  }
+
   private boolean isSkewedStoredAsDirs(LoadTableDesc tbd) {
     return (tbd.getLbCtx() == null) ? false : tbd.getLbCtx()
         .isSkewedStoredAsDir();

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index a542dc4..5bc04e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -208,6 +208,9 @@ import com.google.common.base.Preconditions;
 @SuppressWarnings("nls")
 public final class Utilities {
 
+  // TODO: remove when merging
+  public static final Logger LOG14535 = LoggerFactory.getLogger("Log14535");
+
   /**
    * The object in the reducer are composed of these top level fields.
    */
@@ -1405,6 +1408,7 @@ public final class Utilities {
     Path tmpPath = Utilities.toTempPath(specPath);
     Path taskTmpPath = Utilities.toTaskTempPath(specPath);
     if (success) {
+      // TODO# specPath instead of tmpPath
       FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(
           tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs);
       if(statuses != null && statuses.length > 0) {
@@ -1414,17 +1418,21 @@ public final class Utilities {
         if (emptyBuckets.size() > 0) {
           createEmptyBuckets(hconf, emptyBuckets, conf, reporter);
         }
-
         // move to the file destination
-        log.info("Moving tmp dir: " + tmpPath + " to: " + specPath);
+        Utilities.LOG14535.info("Moving tmp dir: " + tmpPath + " to: " + specPath);
         Utilities.renameOrMoveFiles(fs, tmpPath, specPath);
       }
+      List<Path> paths = new ArrayList<>();
+      // TODO#: HERE listFilesToCommit(specPath, fs, paths);
     } else {
+      Utilities.LOG14535.info("deleting tmpPath " + tmpPath);
       fs.delete(tmpPath, true);
     }
+    Utilities.LOG14535.info("deleting taskTmpPath " + taskTmpPath);
     fs.delete(taskTmpPath, true);
   }
 
+
   /**
    * Check the existence of buckets according to bucket specification. Create empty buckets if
    * needed.
@@ -1465,6 +1473,7 @@ public final class Utilities {
     }
 
     for (Path path : paths) {
+      Utilities.LOG14535.info("creating empty bucket for " + path);
       RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
           jc, hiveOutputFormat, outputClass, isCompressed,
           tableInfo.getProperties(), path, reporter);
@@ -1576,15 +1585,19 @@ public final class Utilities {
 
     for (FileStatus one : items) {
       if (isTempPath(one)) {
+        Utilities.LOG14535.info("removeTempOrDuplicateFiles deleting " + one.getPath(), new Exception());
         if (!fs.delete(one.getPath(), true)) {
           throw new IOException("Unable to delete tmp file: " + one.getPath());
         }
       } else {
         String taskId = getPrefixedTaskIdFromFilename(one.getPath().getName());
+        Utilities.LOG14535.info("removeTempOrDuplicateFiles pondering " + one.getPath() + ", taskId " + taskId, new Exception());
+
         FileStatus otherFile = taskIdToFile.get(taskId);
         if (otherFile == null) {
           taskIdToFile.put(taskId, one);
         } else {
+          // TODO# file choice!
           // Compare the file sizes of all the attempt files for the same task, the largest win
           // any attempt files could contain partial results (due to task failures or
           // speculative runs), but the largest should be the correct one since the result

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 5f53aef..7d8c961 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1472,10 +1472,12 @@ public class Hive {
   public void loadPartition(Path loadPath, String tableName,
       Map<String, String> partSpec, boolean replace,
       boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
-      boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException {
+      boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, boolean isMmTable)
+          throws HiveException {
     Table tbl = getTable(tableName);
+    // TODO# dbl check if table is still mm for consistency
     loadPartition(loadPath, tbl, partSpec, replace, inheritTableSpecs,
-        isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask);
+        isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, isMmTable);
   }
 
   /**
@@ -1499,10 +1501,10 @@ public class Hive {
    *          If the source directory is LOCAL
    * @param isAcid true if this is an ACID operation
    */
-  public Partition loadPartition(Path loadPath, Table tbl,
-      Map<String, String> partSpec, boolean replace,
-      boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
-      boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException {
+  public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec,
+      boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
+      boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, boolean isMmTable)
+          throws HiveException {
 
     Path tblDataLocationPath =  tbl.getDataLocation();
     try {
@@ -1540,17 +1542,25 @@ public class Hive {
       } else {
         newPartPath = oldPartPath;
       }
-      List<Path> newFiles = null;
-      if (replace || (oldPart == null && !isAcid)) {
-        replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(),
-            isSrcLocal);
-      } else {
-        if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) {
-          newFiles = Collections.synchronizedList(new ArrayList<Path>());
+      List<Path> newFiles = null, mmFiles = null;
+      if (isMmTable) {
+        mmFiles = handleMicromanagedPartition(
+            loadPath, tbl, replace, oldPart, newPartPath, isAcid);
+        if (areEventsForDmlNeeded(tbl, oldPart)) {
+          newFiles = mmFiles;
         }
+      } else {
+        if (replace || (oldPart == null && !isAcid)) {
+          replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(),
+              isSrcLocal);
+        } else {
+          if (areEventsForDmlNeeded(tbl, oldPart)) {
+            newFiles = Collections.synchronizedList(new ArrayList<Path>());
+          }
 
-        FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
-        Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles);
+          FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
+          Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles);
+        }
       }
       Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath);
       alterPartitionSpecInMemory(tbl, partSpec, newTPart.getTPartition(), inheritTableSpecs, newPartPath.toString());
@@ -1621,6 +1631,58 @@ public class Hive {
     }
   }
 
+
+  private boolean areEventsForDmlNeeded(Table tbl, Partition oldPart) {
+    return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null;
+  }
+
+
+  private List<Path> handleMicromanagedPartition(Path loadPath, Table tbl, boolean replace,
+      Partition oldPart, Path newPartPath, boolean isAcid) throws HiveException {
+    Utilities.LOG14535.info("not moving " + loadPath + " to " + newPartPath);
+    if (replace) {
+      // TODO#: would need a list of new files to support. Then, old ones only would need
+      //        to be removed from MS (and FS). Also, per-partition IOW is problematic for
+      //        the prefix case.
+      throw new HiveException("Replace and MM are not supported");
+    }
+    if (isAcid) {
+      // TODO# need to make sure ACID writes to final directories. Otherwise, might need to move.
+      throw new HiveException("ACID and MM are not supported");
+    }
+    List<Path> newFiles = new ArrayList<Path>();
+    FileStatus[] srcs;
+    FileSystem srcFs;
+    try {
+      srcFs = loadPath.getFileSystem(conf);
+      srcs = srcFs.globStatus(loadPath);
+    } catch (IOException e) {
+      LOG.error("Error listing files", e);
+      throw new HiveException(e);
+    }
+    if (srcs == null) {
+      LOG.info("No sources specified: " + loadPath);
+      return newFiles;
+    }
+
+    // TODO: just like the move path, we only do one level of recursion.
+    for (FileStatus src : srcs) {
+      if (src.isDirectory()) {
+        try {
+          for (FileStatus srcFile :
+            srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER)) {
+            newFiles.add(srcFile.getPath());
+          }
+        } catch (IOException e) {
+          throw new HiveException(e);
+        }
+      } else {
+        newFiles.add(src.getPath());
+      }
+    }
+    return newFiles;
+  }
+
   private void setStatsPropAndAlterPartition(boolean hasFollowingStatsTask, Table tbl,
       Partition newTPart) throws MetaException, TException {
     EnvironmentContext environmentContext = null;
@@ -1813,9 +1875,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
               LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
 
               // load the partition
+              Utilities.LOG14535.info("loadPartition called for DPP from " + partPath + " to " + tbl.getTableName());
               Partition newPartition = loadPartition(partPath, tbl, fullPartSpec,
                   replace, true, listBucketingEnabled,
-                  false, isAcid, hasFollowingStatsTask);
+                  false, isAcid, hasFollowingStatsTask, false); // TODO# here
               partitionsMap.put(fullPartSpec, newPartition);
 
               if (inPlaceEligible) {
@@ -2803,6 +2866,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
               destPath = mvFile(conf, srcP, destPath, isSrcLocal, srcFs, destFs, name, filetype);
             }
 
+            if (inheritPerms) {
+              HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, destFs, destPath, false);
+            }
             if (null != newFiles) {
               newFiles.add(destPath);
             }

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index cea99e1..4e44d49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1268,6 +1268,7 @@ public final class GenMapRedUtils {
 
     // Create a FileSink operator
     TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone();
+    // TODO# special case #N - merge FS is created here
     FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts,
       conf.getBoolVar(ConfVars.COMPRESSRESULT));
     FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild(
@@ -1806,6 +1807,7 @@ public final class GenMapRedUtils {
 
       // Create the required temporary file in the HDFS location if the destination
       // path of the FileSinkOperator table is a blobstore path.
+      // TODO# HERE
       Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath());
 
       // Change all the linked file sink descriptors
@@ -1813,9 +1815,11 @@ public final class GenMapRedUtils {
         for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) {
           fsConf.setParentDir(tmpDir);
           fsConf.setDirName(new Path(tmpDir, fsConf.getDirName().getName()));
+          Utilities.LOG14535.info("createMoveTask setting tmpDir for LinkedFileSink chDir " + fsConf.getDirName() + "; new parent " + tmpDir + ", dest was " + fileSinkDesc.getDestPath());
         }
       } else {
         fileSinkDesc.setDirName(tmpDir);
+        Utilities.LOG14535.info("createMoveTask setting tmpDir for LinkedFileSink chDir " + tmpDir + "; dest was " + fileSinkDesc.getDestPath());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
index 2a7f3d4..7f7d192 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -217,11 +218,13 @@ public final class UnionProcFactory {
         // each parent
         List<FileSinkDesc> fileDescLists = new ArrayList<FileSinkDesc>();
 
+        // TODO# special case #N - unions
         for (Operator<? extends OperatorDesc> parent : parents) {
           FileSinkDesc fileSinkDesc = (FileSinkDesc) fileSinkOp.getConf().clone();
           fileSinkDesc.setDirName(new Path(parentDirName, parent.getIdentifier()));
           fileSinkDesc.setLinkedFileSink(true);
           fileSinkDesc.setParentDir(parentDirName);
+          Utilities.LOG14535.info("Created LinkedFileSink for union " + fileSinkDesc.getDirName() + "; parent " + parentDirName);
           parent.setChildOperators(null);
           Operator<? extends OperatorDesc> tmpFileSinkOp =
             OperatorFactory.getAndMakeChild(fileSinkDesc, parent.getSchema(), parent);

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
index 0c160ac..e1fc103 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
@@ -65,7 +65,7 @@ public class GenTezProcContext implements NodeProcessorCtx{
 
   public final ParseContext parseContext;
   public final HiveConf conf;
-  public final List<Task<MoveWork>> moveTask;
+  public final List<Task<MoveWork>> moveTask; // TODO#
 
   // rootTasks is the entry point for all generated tasks
   public final List<Task<? extends Serializable>> rootTasks;

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 6715dbf..f4b23e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -305,7 +305,9 @@ public class GenTezUtils {
         linked.add(desc);
 
         desc.setIndexInTezUnion(linked.size());
+        // TODO# special case #N - unions (tez)
         desc.setDirName(new Path(path, "" + desc.getIndexInTezUnion()));
+        Utilities.LOG14535.info("removing union - new desc with " + desc.getDirName() + "; parent " + path);
         desc.setLinkedFileSink(true);
         desc.setParentDir(path);
         desc.setLinkedFileSinkDesc(linked);
@@ -373,6 +375,8 @@ public class GenTezUtils {
       // If underlying data is RCFile or OrcFile, RCFileBlockMerge task or
       // OrcFileStripeMerge task would be created.
       LOG.info("using CombineHiveInputformat for the merge job");
+      Utilities.LOG14535.info("merging files from " + fileSink.getConf().getDirName() + " to " + finalName);
+      // TODO# special case #N - merge
       GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
           context.dependencyTask, context.moveTask,
           hconf, context.currentTask);

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 66589fe..c54a171 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryProperties;
@@ -6542,6 +6543,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     LoadTableDesc ltd = null;
     ListBucketingCtx lbCtx = null;
     Map<String, String> partSpec = null;
+    boolean isMmTable = false;
 
     switch (dest_type.intValue()) {
     case QBMetaData.DEST_TABLE: {
@@ -6551,70 +6553,27 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       destTableIsTemporary = dest_tab.isTemporary();
 
       // Is the user trying to insert into a external tables
-      if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
-          (dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE))) {
-        throw new SemanticException(
-            ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
-      }
+      checkExternalTable(dest_tab);
 
       partSpec = qbm.getPartSpecForAlias(dest);
       dest_path = dest_tab.getPath();
 
-      // If the query here is an INSERT_INTO and the target is an immutable table,
-      // verify that our destination is empty before proceeding
-      if (dest_tab.isImmutable() &&
-          qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),dest_tab.getTableName())){
-        try {
-          FileSystem fs = dest_path.getFileSystem(conf);
-          if (! MetaStoreUtils.isDirEmpty(fs,dest_path)){
-            LOG.warn("Attempted write into an immutable table : "
-                + dest_tab.getTableName() + " : " + dest_path);
-            throw new SemanticException(
-                ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(dest_tab.getTableName()));
-          }
-        } catch (IOException ioe) {
-            LOG.warn("Error while trying to determine if immutable table has any data : "
-                + dest_tab.getTableName() + " : " + dest_path);
-          throw new SemanticException(ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(ioe.getMessage()));
-        }
-      }
+      checkImmutableTable(qb, dest_tab, dest_path, false);
 
-      // check for partition
-      List<FieldSchema> parts = dest_tab.getPartitionKeys();
-      if (parts != null && parts.size() > 0) { // table is partitioned
-        if (partSpec == null || partSpec.size() == 0) { // user did NOT specify partition
-          throw new SemanticException(generateErrorMessage(
-              qb.getParseInfo().getDestForClause(dest),
-              ErrorMsg.NEED_PARTITION_ERROR.getMsg()));
-        }
-        dpCtx = qbm.getDPCtx(dest);
-        if (dpCtx == null) {
-          dest_tab.validatePartColumnNames(partSpec, false);
-          dpCtx = new DynamicPartitionCtx(dest_tab, partSpec,
-              conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
-              conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
-          qbm.setDPCtx(dest, dpCtx);
-        }
-
-        if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) { // allow DP
-          throw new SemanticException(generateErrorMessage(
-              qb.getParseInfo().getDestForClause(dest),
-              ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()));
-        }
-        if (dpCtx.getSPPath() != null) {
-          dest_path = new Path(dest_tab.getPath(), dpCtx.getSPPath());
-        }
-        if ((dest_tab.getNumBuckets() > 0)) {
-          dpCtx.setNumBuckets(dest_tab.getNumBuckets());
-        }
+      // Check for dynamic partitions.
+      dpCtx = checkDynPart(qb, qbm, dest_tab, partSpec, dest);
+      if (dpCtx != null && dpCtx.getSPPath() != null) {
+        dest_path = new Path(dest_tab.getPath(), dpCtx.getSPPath());
       }
 
       boolean isNonNativeTable = dest_tab.isNonNative();
-      if (isNonNativeTable) {
+      isMmTable = isMmTable(dest_tab);
+      if (isNonNativeTable || isMmTable) {
         queryTmpdir = dest_path;
       } else {
         queryTmpdir = ctx.getTempDirForPath(dest_path);
       }
+      Utilities.LOG14535.info("createFS for table specifying " + queryTmpdir + " from " + dest_path);
       if (dpCtx != null) {
         // set the root of the temporary path where dynamic partition columns will populate
         dpCtx.setRootPath(queryTmpdir);
@@ -6641,9 +6600,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
           acidOp = getAcidType(table_desc.getOutputFileFormatClass());
           checkAcidConstraints(qb, table_desc, dest_tab);
         }
-        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp);
-        ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
-            dest_tab.getTableName()));
+        boolean isReplace = !qb.getParseInfo().isInsertIntoTable(
+            dest_tab.getDbName(), dest_tab.getTableName());
+        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, isMmTable);
         ltd.setLbCtx(lbCtx);
         loadTableWork.add(ltd);
       } else {
@@ -6652,42 +6611,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         setStatsForNonNativeTable(dest_tab);
       }
 
-      WriteEntity output = null;
-
-      // Here only register the whole table for post-exec hook if no DP present
-      // in the case of DP, we will register WriteEntity in MoveTask when the
-      // list of dynamically created partitions are known.
-      if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
-        output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable));
-        if (!outputs.add(output)) {
-          throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
-              .getMsg(dest_tab.getTableName()));
-        }
-      }
-      if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
-        // No static partition specified
-        if (dpCtx.getNumSPCols() == 0) {
-          output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable), false);
-          outputs.add(output);
-        }
-        // part of the partition specified
-        // Create a DummyPartition in this case. Since, the metastore does not store partial
-        // partitions currently, we need to store dummy partitions
-        else {
-          try {
-            String ppath = dpCtx.getSPPath();
-            ppath = ppath.substring(0, ppath.length() - 1);
-            DummyPartition p =
-                new DummyPartition(dest_tab, dest_tab.getDbName()
-                    + "@" + dest_tab.getTableName() + "@" + ppath,
-                    partSpec);
-            output = new WriteEntity(p, WriteEntity.WriteType.INSERT, false);
-            outputs.add(output);
-          } catch (HiveException e) {
-            throw new SemanticException(e.getMessage(), e);
-          }
-        }
-      }
+      WriteEntity output = generateTableWriteEntity(
+          dest_tab, partSpec, ltd, dpCtx, isNonNativeTable);
 
       ctx.getLoadTableOutputMap().put(ltd, output);
       break;
@@ -6697,40 +6622,22 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       dest_part = qbm.getDestPartitionForAlias(dest);
       dest_tab = dest_part.getTable();
       destTableIsAcid = AcidUtils.isAcidTable(dest_tab);
-      if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
-          dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE)) {
-        throw new SemanticException(
-            ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
-      }
+
+      checkExternalTable(dest_tab);
 
       Path tabPath = dest_tab.getPath();
       Path partPath = dest_part.getDataLocation();
 
-      // If the query here is an INSERT_INTO and the target is an immutable table,
-      // verify that our destination is empty before proceeding
-      if (dest_tab.isImmutable() &&
-          qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),dest_tab.getTableName())){
-        try {
-          FileSystem fs = partPath.getFileSystem(conf);
-          if (! MetaStoreUtils.isDirEmpty(fs,partPath)){
-            LOG.warn("Attempted write into an immutable table partition : "
-                + dest_tab.getTableName() + " : " + partPath);
-            throw new SemanticException(
-                ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(dest_tab.getTableName()));
-          }
-        } catch (IOException ioe) {
-            LOG.warn("Error while trying to determine if immutable table partition has any data : "
-                + dest_tab.getTableName() + " : " + partPath);
-          throw new SemanticException(ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(ioe.getMessage()));
-        }
-      }
+      checkImmutableTable(qb, dest_tab, partPath, true);
 
       // if the table is in a different dfs than the partition,
       // replace the partition's dfs with the table's dfs.
       dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri()
           .getAuthority(), partPath.toUri().getPath());
 
-      queryTmpdir = ctx.getTempDirForPath(dest_path);
+      isMmTable = isMmTable(dest_tab);
+      queryTmpdir = isMmTable ? dest_path : ctx.getTempDirForPath(dest_path);
+      Utilities.LOG14535.info("createFS for partition specifying " + queryTmpdir + " from " + dest_path);
       table_desc = Utilities.getTableDesc(dest_tab);
 
       // Add sorting/bucketing if needed
@@ -6946,6 +6853,54 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       genPartnCols(dest, input, qb, table_desc, dest_tab, rsCtx);
     }
 
+    FileSinkDesc fileSinkDesc = createFileSinkDesc(table_desc, dest_part,
+        dest_path, currentTableId, destTableIsAcid, destTableIsTemporary,
+        destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
+        canBeMerged, isMmTable);
+
+    Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
+        fileSinkDesc, fsRS, input), inputRR);
+
+    handleLineage(ltd, output);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
+          + dest_path + " row schema: " + inputRR.toString());
+    }
+
+    FileSinkOperator fso = (FileSinkOperator) output;
+    fso.getConf().setTable(dest_tab);
+    fsopToTable.put(fso, dest_tab);
+    // the following code is used to collect column stats when
+    // hive.stats.autogather=true
+    // and it is an insert overwrite or insert into table
+    if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
+        && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)
+        && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) {
+      if (dest_type.intValue() == QBMetaData.DEST_TABLE) {
+        genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo()
+            .isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
+      } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) {
+        genAutoColumnStatsGatheringPipeline(qb, table_desc, dest_part.getSpec(), input, qb
+            .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
+
+      }
+    }
+    return output;
+  }
+
+  private static boolean isMmTable(Table table) {
+    // TODO: perhaps it should be a 3rd value for 'transactional'?
+    String value = table.getProperty(hive_metastoreConstants.TABLE_IS_MM);
+    return value != null && value.equalsIgnoreCase("true");
+  }
+
+  private FileSinkDesc createFileSinkDesc(TableDesc table_desc,
+      Partition dest_part, Path dest_path, int currentTableId,
+      boolean destTableIsAcid, boolean destTableIsTemporary,
+      boolean destTableIsMaterialization, Path queryTmpdir,
+      SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx,
+      RowSchema fsRS, boolean canBeMerged, boolean isMmTable) throws SemanticException {
     FileSinkDesc fileSinkDesc = new FileSinkDesc(
       queryTmpdir,
       table_desc,
@@ -6957,7 +6912,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       rsCtx.getTotalFiles(),
       rsCtx.getPartnCols(),
       dpCtx,
-      dest_path);
+      dest_path,
+      isMmTable);
 
     fileSinkDesc.setHiveServerQuery(SessionState.get().isHiveServerQuery());
     // If this is an insert, update, or delete on an ACID table then mark that so the
@@ -7001,10 +6957,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     } else if (dpCtx != null) {
       fileSinkDesc.setStaticSpec(dpCtx.getSPPath());
     }
+    return fileSinkDesc;
+  }
 
-    Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
-        fileSinkDesc, fsRS, input), inputRR);
-
+  private void handleLineage(LoadTableDesc ltd, Operator output)
+      throws SemanticException {
     if (ltd != null && SessionState.get() != null) {
       SessionState.get().getLineageState()
           .mapDirToFop(ltd.getSourcePath(), (FileSinkOperator) output);
@@ -7022,33 +6979,111 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       SessionState.get().getLineageState()
               .mapDirToFop(tlocation, (FileSinkOperator) output);
     }
+  }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
-          + dest_path + " row schema: " + inputRR.toString());
+  private WriteEntity generateTableWriteEntity(Table dest_tab,
+      Map<String, String> partSpec, LoadTableDesc ltd,
+      DynamicPartitionCtx dpCtx, boolean isNonNativeTable)
+      throws SemanticException {
+    WriteEntity output = null;
+
+    // Here only register the whole table for post-exec hook if no DP present
+    // in the case of DP, we will register WriteEntity in MoveTask when the
+    // list of dynamically created partitions are known.
+    if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
+      output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable));
+      if (!outputs.add(output)) {
+        throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
+            .getMsg(dest_tab.getTableName()));
+      }
     }
+    if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
+      // No static partition specified
+      if (dpCtx.getNumSPCols() == 0) {
+        output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable), false);
+        outputs.add(output);
+      }
+      // part of the partition specified
+      // Create a DummyPartition in this case. Since, the metastore does not store partial
+      // partitions currently, we need to store dummy partitions
+      else {
+        try {
+          String ppath = dpCtx.getSPPath();
+          ppath = ppath.substring(0, ppath.length() - 1);
+          DummyPartition p =
+              new DummyPartition(dest_tab, dest_tab.getDbName()
+                  + "@" + dest_tab.getTableName() + "@" + ppath,
+                  partSpec);
+          output = new WriteEntity(p, WriteEntity.WriteType.INSERT, false);
+          outputs.add(output);
+        } catch (HiveException e) {
+          throw new SemanticException(e.getMessage(), e);
+        }
+      }
+    }
+    return output;
+  }
 
-    FileSinkOperator fso = (FileSinkOperator) output;
-    fso.getConf().setTable(dest_tab);
-    fsopToTable.put(fso, dest_tab);
-    // the following code is used to collect column stats when
-    // hive.stats.autogather=true
-    // and it is an insert overwrite or insert into table
-    if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
-        && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)
-        && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) {
-      if (dest_type.intValue() == QBMetaData.DEST_TABLE) {
-        genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo()
-            .isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
-      } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) {
-        genAutoColumnStatsGatheringPipeline(qb, table_desc, dest_part.getSpec(), input, qb
-            .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
+  private void checkExternalTable(Table dest_tab) throws SemanticException {
+    if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_INSERT_INTO_EXTERNAL_TABLES)) &&
+        (dest_tab.getTableType().equals(TableType.EXTERNAL_TABLE))) {
+      throw new SemanticException(
+          ErrorMsg.INSERT_EXTERNAL_TABLE.getMsg(dest_tab.getTableName()));
+    }
+  }
 
+  private void checkImmutableTable(QB qb, Table dest_tab, Path dest_path, boolean isPart)
+      throws SemanticException {
+    // If the query here is an INSERT_INTO and the target is an immutable table,
+    // verify that our destination is empty before proceeding
+    if (!dest_tab.isImmutable() || !qb.getParseInfo().isInsertIntoTable(
+        dest_tab.getDbName(), dest_tab.getTableName())) {
+      return;
+    }
+    try {
+      FileSystem fs = dest_path.getFileSystem(conf);
+      if (! MetaStoreUtils.isDirEmpty(fs,dest_path)){
+        LOG.warn("Attempted write into an immutable table : "
+            + dest_tab.getTableName() + " : " + dest_path);
+        throw new SemanticException(
+            ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(dest_tab.getTableName()));
       }
+    } catch (IOException ioe) {
+        LOG.warn("Error while trying to determine if immutable table "
+            + (isPart ? "partition " : "") + "has any data : "  + dest_tab.getTableName()
+            + " : " + dest_path);
+      throw new SemanticException(ErrorMsg.INSERT_INTO_IMMUTABLE_TABLE.getMsg(ioe.getMessage()));
     }
-    return output;
   }
 
+  private DynamicPartitionCtx checkDynPart(QB qb, QBMetaData qbm, Table dest_tab,
+      Map<String, String> partSpec, String dest) throws SemanticException {
+    List<FieldSchema> parts = dest_tab.getPartitionKeys();
+    if (parts == null || parts.isEmpty()) return null; // table is not partitioned
+    if (partSpec == null || partSpec.size() == 0) { // user did NOT specify partition
+      throw new SemanticException(generateErrorMessage(qb.getParseInfo().getDestForClause(dest),
+          ErrorMsg.NEED_PARTITION_ERROR.getMsg()));
+    }
+    DynamicPartitionCtx dpCtx = qbm.getDPCtx(dest);
+    if (dpCtx == null) {
+      dest_tab.validatePartColumnNames(partSpec, false);
+      dpCtx = new DynamicPartitionCtx(dest_tab, partSpec,
+          conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
+          conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
+      qbm.setDPCtx(dest, dpCtx);
+    }
+
+    if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) { // allow DP
+      throw new SemanticException(generateErrorMessage(qb.getParseInfo().getDestForClause(dest),
+          ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()));
+    }
+    if ((dest_tab.getNumBuckets() > 0)) {
+      dpCtx.setNumBuckets(dest_tab.getNumBuckets());
+    }
+    return dpCtx;
+  }
+
+
   private void genAutoColumnStatsGatheringPipeline(QB qb, TableDesc table_desc,
       Map<String, String> partSpec, Operator curr, boolean isInsertInto) throws SemanticException {
     String tableName = table_desc.getTableName();

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
index 68b0ad9..ffc9c3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 
 /**
  * Conditional task resolution interface. This is invoked at run time to get the
@@ -243,6 +244,7 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver,
     Path path = ptpi.keySet().iterator().next();
     PartitionDesc partDesc = ptpi.get(path);
     TableDesc tblDesc = partDesc.getTableDesc();
+    Utilities.LOG14535.info("merge resolver removing " + path);
     work.removePathToPartitionInfo(path); // the root path is not useful anymore
 
     // cleanup pathToAliases
@@ -264,9 +266,12 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver,
         totalSz += len;
         PartitionDesc pDesc = (dpCtx != null) ? generateDPFullPartSpec(dpCtx, status, tblDesc, i)
             : partDesc;
+        Utilities.LOG14535.info("merge resolver will merge " + status[i].getPath());
         work.resolveDynamicPartitionStoredAsSubDirsMerge(conf, status[i].getPath(), tblDesc,
             aliases, pDesc);
       } else {
+        Utilities.LOG14535.info("merge resolver will move " + status[i].getPath());
+
         toMove.add(status[i].getPath());
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index ce0e0a8..6ae7fa8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -97,6 +97,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   private transient Table table;
   private Path destPath;
   private boolean isHiveServerQuery;
+  private boolean isMmTable;
 
   public FileSinkDesc() {
   }
@@ -107,7 +108,8 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
       final boolean compressed, final int destTableId, final boolean multiFileSpray,
       final boolean canBeMerged, final int numFiles, final int totalFiles,
-      final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath) {
+      final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath,
+      boolean isMmTable) {
 
     this.dirName = dirName;
     this.tableInfo = tableInfo;
@@ -121,6 +123,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     this.dpCtx = dpCtx;
     this.dpSortState = DPSortState.NONE;
     this.destPath = destPath;
+    this.isMmTable = isMmTable;
   }
 
   public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
@@ -142,7 +145,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   public Object clone() throws CloneNotSupportedException {
     FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed,
         destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles,
-        partitionCols, dpCtx, destPath);
+        partitionCols, dpCtx, destPath, isMmTable);
     ret.setCompressCodec(compressCodec);
     ret.setCompressType(compressType);
     ret.setGatherStats(gatherStats);
@@ -249,6 +252,10 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     this.temporary = temporary;
   }
 
+  public boolean isMmTable() {
+    return isMmTable;
+  }
+
   public boolean isMaterialization() {
     return materialization;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
index df153a2..5e4e1fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.PTFUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 
 /**
  * LoadFileDesc.
@@ -55,6 +56,7 @@ public class LoadFileDesc extends LoadDesc implements Serializable {
       final boolean isDfsDir, final String columns, final String columnTypes) {
 
     super(sourcePath);
+    Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + targetDir, new Exception());
     this.targetDir = targetDir;
     this.isDfsDir = isDfsDir;
     this.columns = columns;

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index 771a919..1ac831d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -23,6 +23,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
@@ -41,18 +42,20 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
   // Need to remember whether this is an acid compliant operation, and if so whether it is an
   // insert, update, or delete.
   private AcidUtils.Operation writeType;
+  private boolean isMmTable;
 
   // TODO: the below seems like they should just be combined into partitionDesc
   private org.apache.hadoop.hive.ql.plan.TableDesc table;
   private Map<String, String> partitionSpec; // NOTE: this partitionSpec has to be ordered map
 
-  public LoadTableDesc(final Path sourcePath,
+  private LoadTableDesc(final Path sourcePath,
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final Map<String, String> partitionSpec,
       final boolean replace,
       final AcidUtils.Operation writeType) {
     super(sourcePath);
-    init(table, partitionSpec, replace, writeType);
+    Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to " + table.getTableName(), new Exception());
+    init(table, partitionSpec, replace, writeType, false);
   }
 
   /**
@@ -91,13 +94,16 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
   public LoadTableDesc(final Path sourcePath,
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final DynamicPartitionCtx dpCtx,
-      final AcidUtils.Operation writeType) {
+      final AcidUtils.Operation writeType,
+      boolean isReplace,
+      boolean isMmTable) {
     super(sourcePath);
+    Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + table.getTableName(), new Exception());
     this.dpCtx = dpCtx;
     if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) {
-      init(table, dpCtx.getPartSpec(), true, writeType);
+      init(table, dpCtx.getPartSpec(), isReplace, writeType, isMmTable);
     } else {
-      init(table, new LinkedHashMap<String, String>(), true, writeType);
+      init(table, new LinkedHashMap<String, String>(), isReplace, writeType, isMmTable);
     }
   }
 
@@ -105,11 +111,12 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final Map<String, String> partitionSpec,
       final boolean replace,
-      AcidUtils.Operation writeType) {
+      AcidUtils.Operation writeType, boolean isMmTable) {
     this.table = table;
     this.partitionSpec = partitionSpec;
     this.replace = replace;
     this.writeType = writeType;
+    this.isMmTable = isMmTable;
   }
 
   @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
@@ -135,6 +142,11 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
     return replace;
   }
 
+  @Explain(displayName = "micromanaged table")
+  public boolean isMmTable() {
+    return isMmTable;
+  }
+
   public void setReplace(boolean replace) {
     this.replace = replace;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index 9f498c7..227b0d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
  */
 @Explain(displayName = "Move Operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
 public class MoveWork implements Serializable {
+  // TODO# all the places where MoveWork is created need to be handled.
   private static final long serialVersionUID = 1L;
   private LoadTableDesc loadTableWork;
   private LoadFileDesc loadFileWork;

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index a8d7c9c..1c27873 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -285,7 +285,7 @@ public class TestFileSinkOperator {
       partColMap.put(PARTCOL_NAME, null);
       DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100);
       //todo: does this need the finalDestination?
-      desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null);
+      desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null, false);
     } else {
       desc = new FileSinkDesc(basePath, tableDesc, false);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/test/queries/clientpositive/mm_all.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_all.q b/ql/src/test/queries/clientpositive/mm_all.q
new file mode 100644
index 0000000..aaf8d48
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/mm_all.q
@@ -0,0 +1,63 @@
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.fetch.task.conversion=none;
+
+drop table simple_mm;
+drop table partunion_mm;
+drop table merge_mm;
+drop table ctas_mm;
+drop table T1;
+drop table T2;
+drop table skew_mm;
+
+
+create table simple_mm(key int) partitioned by (key_mm int)  tblproperties ('hivecommit'='true');
+insert into table simple_mm partition(key_mm='455') select key from src limit 3;
+
+create table ctas_mm tblproperties ('hivecommit'='true') as select * from src limit 3;
+
+create table partunion_mm(id_mm int) partitioned by (key_mm int)  tblproperties ('hivecommit'='true');
+
+
+insert into table partunion_mm partition(key_mm)
+select temps.* from (
+select key as key_mm, key from ctas_mm 
+union all 
+select key as key_mm, key from simple_mm ) temps;
+
+set hive.merge.mapredfiles=true;
+set hive.merge.sparkfiles=true;
+set hive.merge.tezfiles=true;
+
+CREATE TABLE merge_mm (key INT, value STRING) 
+    PARTITIONED BY (ds STRING, part STRING) STORED AS ORC tblproperties ('hivecommit'='true');
+
+EXPLAIN
+INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part)
+        SELECT key, value, PMOD(HASH(key), 2) as part
+        FROM src;
+
+INSERT OVERWRITE TABLE merge_mm PARTITION (ds='123', part)
+        SELECT key, value, PMOD(HASH(key), 2) as part
+        FROM src;
+
+
+set hive.optimize.skewjoin.compiletime = true;
+-- the test case is wrong?
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1;
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3)) STORED AS TEXTFILE;
+LOAD DATA LOCAL INPATH '../../data/files/T2.txt' INTO TABLE T2;
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+create table skew_mm(k1 string, k2 string, k3 string, k4 string) SKEWED BY (key) ON ((2)) tblproperties ('hivecommit'='true');
+INSERT OVERWRITE TABLE skew_mm
+SELECT a.key as k1, a.val as k2, b.key as k3, b.val as k4 FROM T1 a JOIN T2 b ON a.key = b.key;
+
+-- TODO load, acid, etc

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/test/queries/clientpositive/mm_current.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_current.q b/ql/src/test/queries/clientpositive/mm_current.q
new file mode 100644
index 0000000..882096b
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/mm_current.q
@@ -0,0 +1,11 @@
+set hive.mapred.mode=nonstrict;
+set hive.explain.user=false;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.fetch.task.conversion=none;
+
+drop table simple_mm;
+
+  
+create table simple_mm(key int) partitioned by (key_mm int) tblproperties ('hivecommit'='true');
+insert into table simple_mm partition(key_mm='455') select key from src limit 3;
+

http://git-wip-us.apache.org/repos/asf/hive/blob/d0f5b893/ql/src/test/results/clientpositive/llap/mm_current.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_current.q.out b/ql/src/test/results/clientpositive/llap/mm_current.q.out
new file mode 100644
index 0000000..129bb13
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/mm_current.q.out
@@ -0,0 +1,21 @@
+PREHOOK: query: drop table simple123
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table simple123
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table simple123(key int) partitioned by (key123 int) tblproperties ('hivecommit'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@simple123
+POSTHOOK: query: create table simple123(key int) partitioned by (key123 int) tblproperties ('hivecommit'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@simple123
+PREHOOK: query: insert into table simple123 partition(key123='455') select key from src limit 3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@simple123@key123=455
+POSTHOOK: query: insert into table simple123 partition(key123='455') select key from src limit 3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@simple123@key123=455
+POSTHOOK: Lineage: simple123 PARTITION(key123=455).key EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]


Mime
View raw message