hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [41/50] [abbrv] hive git commit: HIVE-14671 : merge master into hive-14535 (Wei Zheng)
Date Mon, 08 May 2017 22:17:58 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 3ad1733,4d727ba..8febcc0
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@@ -38,12 -23,10 +38,13 @@@ import org.apache.hadoop.fs.FSDataOutpu
  import org.apache.hadoop.fs.FileStatus;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.fs.PathFilter;
  import org.apache.hadoop.hive.common.FileUtils;
 +import org.apache.hadoop.hive.common.HiveStatsUtils;
  import org.apache.hadoop.hive.common.StatsSetupConst;
 +import org.apache.hadoop.hive.common.ValidWriteIds;
  import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hadoop.hive.conf.HiveConfUtil;
  import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
  import org.apache.hadoop.hive.ql.CompilationOpContext;
  import org.apache.hadoop.hive.ql.ErrorMsg;
@@@ -172,24 -149,13 +172,24 @@@ public class FileSinkOperator extends T
      Path[] finalPaths;
      RecordWriter[] outWriters;
      RecordUpdater[] updaters;
 -    Stat stat;
 +    private Stat stat;
      int acidLastBucket = -1;
      int acidFileOffset = -1;
 +    private boolean isMmTable;
 +
 +    public FSPaths(Path specPath, boolean isMmTable) {
 +      this.isMmTable = isMmTable;
 +      if (!isMmTable) {
 +        tmpPath = Utilities.toTempPath(specPath);
 +        taskOutputTempPath = Utilities.toTaskTempPath(specPath);
 +      } else {
 +        tmpPath = specPath;
 +        taskOutputTempPath = null; // Should not be used.
-       } 
++      }
 +      Utilities.LOG14535.info("new FSPaths for " + numFiles + " files, dynParts = " + bDynParts
 +          + ": tmpPath " + tmpPath + ", task path " + taskOutputTempPath
 +          + " (spec path " + specPath + ")"/*, new Exception()*/);
  
 -    public FSPaths(Path specPath) {
 -      tmpPath = Utilities.toTempPath(specPath);
 -      taskOutputTempPath = Utilities.toTaskTempPath(specPath);
        outPaths = new Path[numFiles];
        finalPaths = new Path[numFiles];
        outWriters = new RecordWriter[numFiles];
@@@ -240,38 -206,30 +240,38 @@@
        }
      }
  
 -    private void commit(FileSystem fs) throws HiveException {
 +    private void commit(FileSystem fs, List<Path> commitPaths) throws HiveException {
        for (int idx = 0; idx < outPaths.length; ++idx) {
          try {
 -          if ((bDynParts || isSkewedStoredAsSubDirectories)
 -              && !fs.exists(finalPaths[idx].getParent())) {
 -            fs.mkdirs(finalPaths[idx].getParent());
 -          }
 -          boolean needToRename = true;
 -          if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
 -              conf.getWriteType() == AcidUtils.Operation.DELETE) {
 -            // If we're updating or deleting there may be no file to close.  This can happen
 -            // because the where clause strained out all of the records for a given bucket.  So
 -            // before attempting the rename below, check if our file exists.  If it doesn't,
 -            // then skip the rename.  If it does try it.  We could just blindly try the rename
 -            // and avoid the extra stat, but that would mask other errors.
 -            try {
 -              if (outPaths[idx] != null) {
 -                FileStatus stat = fs.getFileStatus(outPaths[idx]);
 -              }
 -            } catch (FileNotFoundException fnfe) {
 -              needToRename = false;
 -            }
 -          }
 -          if (needToRename && outPaths[idx] != null && !fs.rename(outPaths[idx], finalPaths[idx])) {
 +          commitOneOutPath(idx, fs, commitPaths);
 +        } catch (IOException e) {
 +          throw new HiveException("Unable to commit output from: " +
 +              outPaths[idx] + " to: " + finalPaths[idx], e);
 +        }
 +      }
 +    }
 +
 +    private void commitOneOutPath(int idx, FileSystem fs, List<Path> commitPaths)
 +        throws IOException, HiveException {
 +      if ((bDynParts || isSkewedStoredAsSubDirectories)
 +          && !fs.exists(finalPaths[idx].getParent())) {
 +        Utilities.LOG14535.info("commit making path for dyn/skew: " + finalPaths[idx].getParent());
-         FileUtils.mkdir(fs, finalPaths[idx].getParent(), inheritPerms, hconf);
++        FileUtils.mkdir(fs, finalPaths[idx].getParent(), hconf);
 +      }
 +      // If we're updating or deleting there may be no file to close.  This can happen
 +      // because the where clause strained out all of the records for a given bucket.  So
 +      // before attempting the rename below, check if our file exists.  If it doesn't,
 +      // then skip the rename.  If it does try it.  We could just blindly try the rename
 +      // and avoid the extra stat, but that would mask other errors.
 +      Operation acidOp = conf.getWriteType();
 +      boolean needToRename = outPaths[idx] != null && ((acidOp != Operation.UPDATE
 +          && acidOp != Operation.DELETE) || fs.exists(outPaths[idx]));
 +      if (needToRename && outPaths[idx] != null) {
 +        Utilities.LOG14535.info("committing " + outPaths[idx] + " to " + finalPaths[idx] + " (" + isMmTable + ")");
 +        if (isMmTable) {
 +          assert outPaths[idx].equals(finalPaths[idx]);
 +          commitPaths.add(outPaths[idx]);
 +        } else if (!fs.rename(outPaths[idx], finalPaths[idx])) {
              FileStatus fileStatus = FileUtils.getFileStatusOrNull(fs, finalPaths[idx]);
              if (fileStatus != null) {
                LOG.warn("Target path " + finalPaths[idx] + " with a size " + fileStatus.getLen() + " exists. Trying to delete it.");
@@@ -735,15 -613,9 +731,15 @@@
        Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc);
        // only create bucket files only if no dynamic partitions,
        // buckets of dynamic partitions will be created for each newly created partition
 -      if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
 +      if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID ||
 +          conf.getWriteType() == AcidUtils.Operation.INSERT_ONLY) {
 +        Path outPath = fsp.outPaths[filesIdx];
 +        if ((conf.getWriteType() == AcidUtils.Operation.INSERT_ONLY || conf.isMmTable())
-             && inheritPerms && !FileUtils.mkdir(fs, outPath.getParent(), inheritPerms, hconf)) {
++            && !FileUtils.mkdir(fs, outPath.getParent(), hconf)) {
 +          LOG.warn("Unable to create directory with inheritPerms: " + outPath);
 +        }
          fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(),
 -            outputClass, conf, fsp.outPaths[filesIdx], reporter);
 +            outputClass, conf, outPath, reporter);
          // If the record writer provides stats, get it from there instead of the serde
          statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof
              StatsProvidingRecordWriter;

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 29b72a0,f329b51..acf7404
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@@ -410,234 -547,6 +407,234 @@@ public class MoveTask extends Task<Move
        return (1);
      }
    }
 +
 +  private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd,
 +      TaskInformation ti) throws HiveException, IOException, InvalidOperationException {
 +    List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),  tbd.getPartitionSpec());
 +    db.validatePartitionNameCharacters(partVals);
 +    Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath()
 +        + " into " + tbd.getTable().getTableName());
 +    boolean isCommitMmWrite = tbd.isCommitMmWrite();
-     db.loadSinglePartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
++    db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
 +        tbd.getPartitionSpec(), tbd.getReplace(),
 +        tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(),
 +        (work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID &&
 +         work.getLoadTableWork().getWriteType() != AcidUtils.Operation.INSERT_ONLY),
 +        hasFollowingStatsTask(), tbd.getMmWriteId(), isCommitMmWrite);
 +    Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
 +
 +    // See the comment inside updatePartitionBucketSortColumns.
 +    if (!tbd.isMmTable() && (ti.bucketCols != null || ti.sortCols != null)) {
 +      updatePartitionBucketSortColumns(db, table, partn, ti.bucketCols,
 +          ti.numBuckets, ti.sortCols);
 +    }
 +
 +    DataContainer dc = new DataContainer(table.getTTable(), partn.getTPartition());
 +    // add this partition to post-execution hook
 +    if (work.getOutputs() != null) {
 +      DDLTask.addIfAbsentByName(new WriteEntity(partn,
 +        getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());
 +    }
 +    return dc;
 +  }
 +
 +  private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd,
 +      TaskInformation ti, DynamicPartitionCtx dpCtx) throws HiveException,
 +      IOException, InvalidOperationException {
 +    DataContainer dc;
 +    List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, dpCtx);
 +
 +    console.printInfo(System.getProperty("line.separator"));
 +    long startTime = System.currentTimeMillis();
 +    // load the list of DP partitions and return the list of partition specs
 +    // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions
 +    // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
 +    // After that check the number of DPs created to not exceed the limit and
 +    // iterate over it and call loadPartition() here.
 +    // The reason we don't do inside HIVE-1361 is the latter is large and we
 +    // want to isolate any potential issue it may introduce.
 +    if (tbd.isMmTable() && !tbd.isCommitMmWrite()) {
 +      throw new HiveException("Only single-partition LoadTableDesc can skip commiting write ID");
 +    }
 +    Map<Map<String, String>, Partition> dp =
 +      db.loadDynamicPartitions(
 +        tbd.getSourcePath(),
 +        tbd.getTable().getTableName(),
 +        tbd.getPartitionSpec(),
 +        tbd.getReplace(),
 +        dpCtx.getNumDPCols(),
 +        (tbd.getLbCtx() == null) ? 0 : tbd.getLbCtx().calculateListBucketingLevel(),
 +        work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID &&
 +            work.getLoadTableWork().getWriteType() != AcidUtils.Operation.INSERT_ONLY,
 +        SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(),
 +        work.getLoadTableWork().getWriteType(),
 +        tbd.getMmWriteId());
 +
 +    // publish DP columns to its subscribers
 +    if (dps != null && dps.size() > 0) {
 +      pushFeed(FeedType.DYNAMIC_PARTITIONS, dp.values());
 +    }
 +
 +    String loadTime = "\t Time taken to load dynamic partitions: "  +
 +        (System.currentTimeMillis() - startTime)/1000.0 + " seconds";
 +    console.printInfo(loadTime);
 +    LOG.info(loadTime);
 +
 +    if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
 +      throw new HiveException("This query creates no partitions." +
 +          " To turn off this error, set hive.error.on.empty.partition=false.");
 +    }
 +
 +    startTime = System.currentTimeMillis();
 +    // for each partition spec, get the partition
 +    // and put it to WriteEntity for post-exec hook
 +    for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
 +      Partition partn = entry.getValue();
 +
 +      // See the comment inside updatePartitionBucketSortColumns.
 +      if (!tbd.isMmTable() && (ti.bucketCols != null || ti.sortCols != null)) {
 +        updatePartitionBucketSortColumns(
 +            db, table, partn, ti.bucketCols, ti.numBuckets, ti.sortCols);
 +      }
 +
 +      WriteEntity enty = new WriteEntity(partn,
 +        getWriteType(tbd, work.getLoadTableWork().getWriteType()));
 +      if (work.getOutputs() != null) {
 +        DDLTask.addIfAbsentByName(enty, work.getOutputs());
 +      }
 +      // Need to update the queryPlan's output as well so that post-exec hook get executed.
 +      // This is only needed for dynamic partitioning since for SP the the WriteEntity is
 +      // constructed at compile time and the queryPlan already contains that.
 +      // For DP, WriteEntity creation is deferred at this stage so we need to update
 +      // queryPlan here.
 +      if (queryPlan.getOutputs() == null) {
 +        queryPlan.setOutputs(new LinkedHashSet<WriteEntity>());
 +      }
 +      queryPlan.getOutputs().add(enty);
 +
 +      // update columnar lineage for each partition
 +      dc = new DataContainer(table.getTTable(), partn.getTPartition());
 +
 +      // Don't set lineage on delete as we don't have all the columns
 +      if (SessionState.get() != null &&
 +          work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE &&
 +          work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) {
 +        SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
 +            table.getCols());
 +      }
 +      LOG.info("\tLoading partition " + entry.getKey());
 +    }
 +    console.printInfo("\t Time taken for adding to write entity : " +
 +        (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
 +    dc = null; // reset data container to prevent it being added again.
 +    return dc;
 +  }
 +
 +  private void inferTaskInformation(TaskInformation ti) {
 +    // Find the first ancestor of this MoveTask which is some form of map reduce task
 +    // (Either standard, local, or a merge)
 +    while (ti.task.getParentTasks() != null && ti.task.getParentTasks().size() == 1) {
 +      ti.task = (Task)ti.task.getParentTasks().get(0);
 +      // If it was a merge task or a local map reduce task, nothing can be inferred
 +      if (ti.task instanceof MergeFileTask || ti.task instanceof MapredLocalTask) {
 +        break;
 +      }
 +
 +      // If it's a standard map reduce task, check what, if anything, it inferred about
 +      // the directory this move task is moving
 +      if (ti.task instanceof MapRedTask) {
 +        MapredWork work = (MapredWork)ti.task.getWork();
 +        MapWork mapWork = work.getMapWork();
 +        ti.bucketCols = mapWork.getBucketedColsByDirectory().get(ti.path);
 +        ti.sortCols = mapWork.getSortedColsByDirectory().get(ti.path);
 +        if (work.getReduceWork() != null) {
 +          ti.numBuckets = work.getReduceWork().getNumReduceTasks();
 +        }
 +
 +        if (ti.bucketCols != null || ti.sortCols != null) {
 +          // This must be a final map reduce task (the task containing the file sink
 +          // operator that writes the final output)
 +          assert work.isFinalMapRed();
 +        }
 +        break;
 +      }
 +
 +      // If it's a move task, get the path the files were moved from, this is what any
 +      // preceding map reduce task inferred information about, and moving does not invalidate
 +      // those assumptions
 +      // This can happen when a conditional merge is added before the final MoveTask, but the
 +      // condition for merging is not met, see GenMRFileSink1.
 +      if (ti.task instanceof MoveTask) {
 +        MoveTask mt = (MoveTask)ti.task;
 +        if (mt.getWork().getLoadFileWork() != null) {
 +          ti.path = mt.getWork().getLoadFileWork().getSourcePath().toUri().toString();
 +        }
 +      }
 +    }
 +  }
 +
 +  private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table)
 +      throws HiveException {
 +    if (work.getCheckFileFormat()) {
 +      // Get all files from the src directory
 +      FileStatus[] dirs;
 +      ArrayList<FileStatus> files;
 +      FileSystem srcFs; // source filesystem
 +      try {
 +        srcFs = tbd.getSourcePath().getFileSystem(conf);
 +        dirs = srcFs.globStatus(tbd.getSourcePath());
 +        files = new ArrayList<FileStatus>();
 +        for (int i = 0; (dirs != null && i < dirs.length); i++) {
 +          files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER)));
 +          // We only check one file, so exit the loop when we have at least
 +          // one.
 +          if (files.size() > 0) {
 +            break;
 +          }
 +        }
 +      } catch (IOException e) {
 +        throw new HiveException(
 +            "addFiles: filesystem error in check phase", e);
 +      }
 +
 +      // handle file format check for table level
 +      if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
 +        boolean flag = true;
 +        // work.checkFileFormat is set to true only for Load Task, so assumption here is
 +        // dynamic partition context is null
 +        if (tbd.getDPCtx() == null) {
 +          if (tbd.getPartitionSpec() == null || tbd.getPartitionSpec().isEmpty()) {
 +            // Check if the file format of the file matches that of the table.
 +            flag = HiveFileFormatUtils.checkInputFormat(
 +                srcFs, conf, tbd.getTable().getInputFileFormatClass(), files);
 +          } else {
 +            // Check if the file format of the file matches that of the partition
 +            Partition oldPart = db.getPartition(table, tbd.getPartitionSpec(), false);
 +            if (oldPart == null) {
 +              // this means we have just created a table and are specifying partition in the
 +              // load statement (without pre-creating the partition), in which case lets use
 +              // table input format class. inheritTableSpecs defaults to true so when a new
 +              // partition is created later it will automatically inherit input format
 +              // from table object
 +              flag = HiveFileFormatUtils.checkInputFormat(
 +                  srcFs, conf, tbd.getTable().getInputFileFormatClass(), files);
 +            } else {
 +              flag = HiveFileFormatUtils.checkInputFormat(
 +                  srcFs, conf, oldPart.getInputFormatClass(), files);
 +            }
 +          }
 +          if (!flag) {
 +            throw new HiveException(
 +                "Wrong file format. Please check the file's format.");
 +          }
 +        } else {
 +          LOG.warn("Skipping file format check as dpCtx is not null");
 +        }
 +      }
 +    }
 +  }
 +
 +
    /**
     * so to make sure we crate WriteEntity with the right WriteType.  This is (at this point) only
     * for consistency since LockManager (which is the only thing that pays attention to WriteType)
@@@ -771,4 -674,4 +768,4 @@@
    public String getName() {
      return "MOVE";
    }
--}
++}

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 5b5ddc3,9036d9e..777c119
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@@ -23,59 -23,8 +23,60 @@@ import com.google.common.annotations.Vi
  import com.google.common.base.Preconditions;
  import com.google.common.collect.Lists;
  import com.google.common.collect.Sets;
 +import java.beans.DefaultPersistenceDelegate;
 +import java.beans.Encoder;
 +import java.beans.Expression;
 +import java.beans.Statement;
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataInput;
 +import java.io.EOFException;
 +import java.io.File;
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.OutputStream;
 +import java.io.Serializable;
 +import java.net.URI;
 +import java.net.URL;
 +import java.net.URLClassLoader;
 +import java.net.URLDecoder;
 +import java.sql.Connection;
 +import java.sql.DriverManager;
 +import java.sql.PreparedStatement;
 +import java.sql.SQLException;
 +import java.sql.SQLFeatureNotSupportedException;
 +import java.sql.SQLTransientException;
 +import java.text.SimpleDateFormat;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Calendar;
 +import java.util.Collection;
 +import java.util.Enumeration;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.LinkedHashMap;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Properties;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.LinkedBlockingQueue;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +import java.util.zip.Deflater;
 +import java.util.zip.DeflaterOutputStream;
 +import java.util.zip.InflaterInputStream;
  import com.google.common.util.concurrent.ThreadFactoryBuilder;
+ 
  import org.apache.commons.codec.binary.Base64;
  import org.apache.commons.lang.StringUtils;
  import org.apache.commons.lang.WordUtils;
@@@ -201,9 -145,7 +204,8 @@@ import org.apache.hadoop.mapred.RecordR
  import org.apache.hadoop.mapred.Reporter;
  import org.apache.hadoop.mapred.SequenceFileInputFormat;
  import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 +import org.apache.hadoop.mapred.TextInputFormat;
  import org.apache.hadoop.util.Progressable;
- import org.apache.hadoop.util.Shell;
  import org.apache.hive.common.util.ACLConfigurationParser;
  import org.apache.hive.common.util.ReflectionUtil;
  import org.slf4j.Logger;
@@@ -3162,28 -3021,20 +3164,32 @@@ public final class Utilities 
  
      Set<Path> pathsProcessed = new HashSet<Path>();
      List<Path> pathsToAdd = new LinkedList<Path>();
+     LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState();
      // AliasToWork contains all the aliases
 -    for (String alias : work.getAliasToWork().keySet()) {
 +    Collection<String> aliasToWork = work.getAliasToWork().keySet();
 +    if (!skipDummy) {
 +      // ConcurrentModification otherwise if adding dummy.
 +      aliasToWork = new ArrayList<>(aliasToWork);
 +    }
 +    for (String alias : aliasToWork) {
        LOG.info("Processing alias " + alias);
  
        // The alias may not have any path
 +      Collection<Map.Entry<Path, ArrayList<String>>> pathToAliases =
 +          work.getPathToAliases().entrySet();
 +      if (!skipDummy) {
 +        // ConcurrentModification otherwise if adding dummy.
 +        pathToAliases = new ArrayList<>(pathToAliases);
 +      }
        boolean isEmptyTable = true;
        boolean hasLogged = false;
 -      // Note: this copies the list because createDummyFileForEmptyPartition may modify the map.
 -      for (Path file : new LinkedList<Path>(work.getPathToAliases().keySet())) {
 +      Path path = null;
 +      for (Map.Entry<Path, ArrayList<String>> e : pathToAliases) {
+         if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+           throw new IOException("Operation is Canceled. ");
+ 
 -        List<String> aliases = work.getPathToAliases().get(file);
 +        Path file = e.getKey();
 +        List<String> aliases = e.getValue();
          if (aliases.contains(alias)) {
            if (file != null) {
              isEmptyTable = false;

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index 0b1ac4b,9a7e9d9..b018adb
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@@ -44,7 -43,8 +45,9 @@@ import org.apache.hadoop.conf.Configura
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.fs.PathFilter;
 +import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+ import org.apache.hadoop.hive.ql.Driver.DriverState;
+ import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
  import org.apache.hadoop.hive.ql.exec.Operator;
  import org.apache.hadoop.hive.ql.exec.Utilities;
  import org.apache.hadoop.hive.ql.log.PerfLogger;
@@@ -363,9 -351,13 +366,13 @@@ public class CombineHiveInputFormat<K e
      Map<CombinePathInputFormat, CombineFilter> poolMap =
        new HashMap<CombinePathInputFormat, CombineFilter>();
      Set<Path> poolSet = new HashSet<Path>();
+     LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState();
  
      for (Path path : paths) {
+       if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+         throw new IOException("Operation is Canceled. ");
+ 
 -      PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
 +      PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively(
            pathToPartitionInfo, path, IOPrepareCache.get().allocatePartitionDescMap());
        TableDesc tableDesc = part.getTableDesc();
        if ((tableDesc != null) && tableDesc.isNonNative()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index d255265,cdf2c40..01e8a48
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@@ -240,8 -370,8 +370,8 @@@ public final class DbTxnManager extend
            // This is a file or something we don't hold locks for.
            continue;
        }
-       if(t != null && AcidUtils.isFullAcidTable(t)) {
-         compBuilder.setIsAcid(true);
+       if(t != null) {
 -        compBuilder.setIsAcid(AcidUtils.isAcidTable(t));
++        compBuilder.setIsAcid(AcidUtils.isFullAcidTable(t));
        }
        LockComponent comp = compBuilder.build();
        LOG.debug("Adding lock component to lock request " + comp.toString());
@@@ -270,10 -426,9 +426,9 @@@
            break;
  
          case INSERT:
-           t = getTable(output);
+           assert t != null;
 -          if(AcidUtils.isAcidTable(t)) {
 +          if(AcidUtils.isFullAcidTable(t)) {
              compBuilder.setShared();
-             compBuilder.setIsAcid(true);
            }
            else {
              if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) {
@@@ -307,34 -459,11 +459,11 @@@
          default:
            throw new RuntimeException("Unknown write type " +
                output.getWriteType().toString());
- 
        }
-       switch (output.getType()) {
-         case DATABASE:
-           compBuilder.setDbName(output.getDatabase().getName());
-           break;
- 
-         case TABLE:
-         case DUMMYPARTITION:   // in case of dynamic partitioning lock the table
-           t = output.getTable();
-           compBuilder.setDbName(t.getDbName());
-           compBuilder.setTableName(t.getTableName());
-           break;
- 
-         case PARTITION:
-           compBuilder.setPartitionName(output.getPartition().getName());
-           t = output.getPartition().getTable();
-           compBuilder.setDbName(t.getDbName());
-           compBuilder.setTableName(t.getTableName());
-           break;
- 
-         default:
-           // This is a file or something we don't hold locks for.
-           continue;
-       }
-       if(t != null && AcidUtils.isFullAcidTable(t)) {
-         compBuilder.setIsAcid(true);
+       if(t != null) {
 -        compBuilder.setIsAcid(AcidUtils.isAcidTable(t));
++        compBuilder.setIsAcid(AcidUtils.isFullAcidTable(t));
        }
+ 
        compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
        LockComponent comp = compBuilder.build();
        LOG.debug("Adding lock component to lock request " + comp.toString());

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index ea87cb4,5b908e8..6498199
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@@ -1578,30 -1601,26 +1608,42 @@@ public class Hive 
      return getDatabase(currentDb);
    }
  
-   public void loadSinglePartition(Path loadPath, String tableName,
+   /**
+    * @param loadPath
+    * @param tableName
+    * @param partSpec
+    * @param replace
+    * @param inheritTableSpecs
+    * @param isSkewedStoreAsSubdir
+    * @param isSrcLocal
+    * @param isAcid
+    * @param hasFollowingStatsTask
+    * @return
+    * @throws HiveException
+    */
+   public void loadPartition(Path loadPath, String tableName,
 -      Map<String, String> partSpec, boolean replace,
 -      boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
 -      boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException {
 +      Map<String, String> partSpec, boolean replace, boolean inheritTableSpecs,
 +      boolean isSkewedStoreAsSubdir,  boolean isSrcLocal, boolean isAcid,
 +      boolean hasFollowingStatsTask, Long mmWriteId, boolean isCommitMmWrite)
 +          throws HiveException {
      Table tbl = getTable(tableName);
 +    boolean isMmTableWrite = (mmWriteId != null);
 +    Preconditions.checkState(isMmTableWrite == MetaStoreUtils.isInsertOnlyTable(tbl.getParameters()));
      loadPartition(loadPath, tbl, partSpec, replace, inheritTableSpecs,
 -        isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask);
 +        isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, mmWriteId);
 +    if (isMmTableWrite && isCommitMmWrite) {
 +      // The assumption behind committing here is that this partition is the only one outputted.
 +      commitMmTableWrite(tbl, mmWriteId);
 +    }
 +  }
 +
- 
 +  public void commitMmTableWrite(Table tbl, Long mmWriteId)
 +      throws HiveException {
 +    try {
 +      getMSC().finalizeTableWrite(tbl.getDbName(), tbl.getTableName(), mmWriteId, true);
 +    } catch (TException e) {
 +      throw new HiveException(e);
 +    }
    }
  
    /**
@@@ -1623,14 -1642,20 +1665,19 @@@
     *          location/inputformat/outputformat/serde details from table spec
     * @param isSrcLocal
     *          If the source directory is LOCAL
-    * @param isAcid true if this is an ACID operation
+    * @param isAcid
+    *          true if this is an ACID operation
+    * @param hasFollowingStatsTask
+    *          true if there is a following task which updates the stats, so, this method need not update.
+    * @return Partition object being loaded with data
     */
 -  public Partition loadPartition(Path loadPath, Table tbl,
 -      Map<String, String> partSpec, boolean replace,
 -      boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
 -      boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) throws HiveException {
 -
 +  public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec,
 +      boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
 +      boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, Long mmWriteId)
 +          throws HiveException {
      Path tblDataLocationPath =  tbl.getDataLocation();
      try {
+       // Get the partition object if it already exists
        Partition oldPart = getPartition(tbl, partSpec, false);
        /**
         * Move files before creating the partition since down stream processes
@@@ -1668,41 -1693,21 +1715,45 @@@
        List<Path> newFiles = null;
        PerfLogger perfLogger = SessionState.getPerfLogger();
        perfLogger.PerfLogBegin("MoveTask", "FileMoves");
 -
+       // If config is set, table is not temporary and partition being inserted exists, capture
+       // the list of files added. For not yet existing partitions (insert overwrite to new partition
+       // or dynamic partition inserts), the add partition event will capture the list of files added.
+       if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) {
+         newFiles = Collections.synchronizedList(new ArrayList<Path>());
+       }
 -
 -      if (replace || (oldPart == null && !isAcid)) {
 -        boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
 -        replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(),
 -            isSrcLocal, isAutoPurge, newFiles);
 +      // TODO: this assumes both paths are qualified; which they are, currently.
 +      if (mmWriteId != null && loadPath.equals(newPartPath)) {
 +        // MM insert query, move itself is a no-op.
 +        Utilities.LOG14535.info("not moving " + loadPath + " to " + newPartPath + " (MM)");
 +        assert !isAcid;
 +        if (areEventsForDmlNeeded(tbl, oldPart)) {
 +          newFiles = listFilesCreatedByQuery(loadPath, mmWriteId);
 +        }
 +        Utilities.LOG14535.info("maybe deleting stuff from " + oldPartPath + " (new " + newPartPath + ") for replace");
 +        if (replace && oldPartPath != null) {
-           deleteOldPathForReplace(newPartPath, oldPartPath, getConf(),
++          boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
++          deleteOldPathForReplace(newPartPath, oldPartPath, getConf(), isAutoPurge,
 +              new ValidWriteIds.IdPathFilter(mmWriteId, false, true), mmWriteId != null,
 +              tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0);
 +        }
        } else {
 -        FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
 -        Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles);
 +        // Either a non-MM query, or a load into MM table from an external source.
 +        PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER;
 +        Path destPath = newPartPath;
 +        if (mmWriteId != null) {
 +          // We will load into MM directory, and delete from the parent if needed.
 +          destPath = new Path(destPath, ValidWriteIds.getMmFilePrefix(mmWriteId));
 +          filter = replace ? new ValidWriteIds.IdPathFilter(mmWriteId, false, true) : filter;
 +        }
 +        Utilities.LOG14535.info("moving " + loadPath + " to " + destPath);
 +        if (replace || (oldPart == null && !isAcid)) {
++          boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
 +          replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(),
-               isSrcLocal, filter, mmWriteId != null);
++              isSrcLocal, isAutoPurge, newFiles, filter, mmWriteId != null);
 +        } else {
-           if (areEventsForDmlNeeded(tbl, oldPart)) {
-             newFiles = Collections.synchronizedList(new ArrayList<Path>());
-           }
- 
 +          FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
 +          Hive.copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcid, newFiles);
 +        }
        }
        perfLogger.PerfLogEnd("MoveTask", "FileMoves");
        Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, partSpec, newPartPath);
@@@ -1774,54 -1783,6 +1829,54 @@@
      }
    }
  
 +
 +  private boolean areEventsForDmlNeeded(Table tbl, Partition oldPart) {
 +    return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null;
 +  }
 +
 +  private List<Path> listFilesCreatedByQuery(Path loadPath, long mmWriteId) throws HiveException {
 +    List<Path> newFiles = new ArrayList<Path>();
 +    final String filePrefix = ValidWriteIds.getMmFilePrefix(mmWriteId);
 +    FileStatus[] srcs;
 +    FileSystem srcFs;
 +    try {
 +      srcFs = loadPath.getFileSystem(conf);
 +      srcs = srcFs.listStatus(loadPath);
 +    } catch (IOException e) {
 +      LOG.error("Error listing files", e);
 +      throw new HiveException(e);
 +    }
 +    if (srcs == null) {
 +      LOG.info("No sources specified: " + loadPath);
 +      return newFiles;
 +    }
 +    PathFilter subdirFilter = null;
-  
++
 +    // TODO: just like the move path, we only do one level of recursion.
 +    for (FileStatus src : srcs) {
 +      if (src.isDirectory()) {
 +        if (subdirFilter == null) {
 +          subdirFilter = new PathFilter() {
 +            @Override
 +            public boolean accept(Path path) {
 +              return path.getName().startsWith(filePrefix);
 +            }
 +          };
 +        }
 +        try {
 +          for (FileStatus srcFile : srcFs.listStatus(src.getPath(), subdirFilter)) {
 +            newFiles.add(srcFile.getPath());
 +          }
 +        } catch (IOException e) {
 +          throw new HiveException(e);
 +        }
 +      } else if (src.getPath().getName().startsWith(filePrefix)) {
 +        newFiles.add(src.getPath());
 +      }
 +    }
 +    return newFiles;
 +  }
 +
    private void setStatsPropAndAlterPartition(boolean hasFollowingStatsTask, Table tbl,
        Partition newTPart) throws MetaException, TException {
      EnvironmentContext environmentContext = null;
@@@ -2153,36 -2067,17 +2208,38 @@@ private void constructOneLBLocationMap(
      if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
        newFiles = Collections.synchronizedList(new ArrayList<Path>());
      }
 -    if (replace) {
 -      Path tableDest = tbl.getPath();
 -      boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
 -      replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal, isAutopurge, newFiles);
 +    // TODO: this assumes both paths are qualified; which they are, currently.
 +    if (mmWriteId != null && loadPath.equals(tbl.getPath())) {
 +      Utilities.LOG14535.info("not moving " + loadPath + " to " + tbl.getPath());
 +      if (replace) {
 +        Path tableDest = tbl.getPath();
-         deleteOldPathForReplace(tableDest, tableDest, sessionConf,
++        boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
++        deleteOldPathForReplace(tableDest, tableDest, sessionConf, isAutopurge,
 +            new ValidWriteIds.IdPathFilter(mmWriteId, false, true), mmWriteId != null,
 +            tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0);
 +      }
 +      newFiles = listFilesCreatedByQuery(loadPath, mmWriteId);
      } else {
 -      FileSystem fs;
 -      try {
 -        fs = tbl.getDataLocation().getFileSystem(sessionConf);
 -        copyFiles(sessionConf, loadPath, tbl.getPath(), fs, isSrcLocal, isAcid, newFiles);
 -      } catch (IOException e) {
 -        throw new HiveException("addFiles: filesystem error in check phase", e);
 +      // Either a non-MM query, or a load into MM table from an external source.
 +      Path tblPath = tbl.getPath(), destPath = tblPath;
 +      PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER;
 +      if (mmWriteId != null) {
 +        // We will load into MM directory, and delete from the parent if needed.
 +        destPath = new Path(destPath, ValidWriteIds.getMmFilePrefix(mmWriteId));
 +        filter = replace ? new ValidWriteIds.IdPathFilter(mmWriteId, false, true) : filter;
 +      }
 +      Utilities.LOG14535.info("moving " + loadPath + " to " + tblPath + " (replace = " + replace + ")");
 +      if (replace) {
++        boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
 +        replaceFiles(tblPath, loadPath, destPath, tblPath,
-             sessionConf, isSrcLocal, filter, mmWriteId != null);
++            sessionConf, isSrcLocal, isAutopurge, newFiles, filter, mmWriteId != null);
 +      } else {
 +        try {
 +          FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf);
 +          copyFiles(sessionConf, loadPath, destPath, fs, isSrcLocal, isAcid, newFiles);
 +        } catch (IOException e) {
 +          throw new HiveException("addFiles: filesystem error in check phase", e);
 +        }
        }
      }
      if (!this.getConf().getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
@@@ -2217,11 -2112,7 +2274,11 @@@
        throw new HiveException(e);
      }
  
 +    if (mmWriteId != null) {
 +      commitMmTableWrite(tbl, mmWriteId);
 +    }
 +
-     fireInsertEvent(tbl, null, newFiles);
+     fireInsertEvent(tbl, null, replace, newFiles);
    }
  
    /**
@@@ -3270,21 -3182,17 +3347,18 @@@
        throw new HiveException(e.getMessage(), e);
      }
  
-     //needed for perm inheritance.
-     final boolean inheritPerms = HiveConf.getBoolVar(conf,
-         HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
      HdfsUtils.HadoopFileStatus destStatus = null;
  
 -    // If source path is a subdirectory of the destination path:
 +    // If source path is a subdirectory of the destination path (or the other way around):
      //   ex: INSERT OVERWRITE DIRECTORY 'target/warehouse/dest4.out' SELECT src.value WHERE src.key >= 300;
      //   where the staging directory is a subdirectory of the destination directory
      // (1) Do not delete the dest dir before doing the move operation.
      // (2) It is assumed that subdir and dir are in same encryption zone.
      // (3) Move individual files from scr dir to dest dir.
 -    boolean destIsSubDir = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal);
 +    boolean srcIsSubDirOfDest = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal),
 +        destIsSubDirOfSrc = isSubDir(destf, srcf, destFs, srcFs, false);
      try {
-       if (inheritPerms || replace) {
+       if (replace) {
          try{
            destStatus = new HdfsUtils.HadoopFileStatus(conf, destFs, destf);
            //if destf is an existing directory:
@@@ -3571,11 -3455,15 +3625,16 @@@
     * @param oldPath
     *          The directory where the old data location, need to be cleaned up.  Most of time, will be the same
     *          as destf, unless its across FileSystem boundaries.
+    * @param purge
+    *          When set to true files which needs to be deleted are not moved to Trash
     * @param isSrcLocal
     *          If the source directory is LOCAL
+    * @param newFiles
+    *          Output the list of new files replaced in the destination path
     */
    protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
-           boolean isSrcLocal, PathFilter deletePathFilter, boolean isMmTable) throws HiveException {
 -          boolean isSrcLocal, boolean purge, List<Path> newFiles) throws HiveException {
++          boolean isSrcLocal, boolean purge, List<Path> newFiles, PathFilter deletePathFilter,
++          boolean isMmTable) throws HiveException {
      try {
  
        FileSystem destFs = destf.getFileSystem(conf);
@@@ -3594,9 -3482,37 +3653,9 @@@
        }
  
        if (oldPath != null) {
 -        boolean oldPathDeleted = false;
 -        boolean isOldPathUnderDestf = false;
 -        FileStatus[] statuses = null;
 -        try {
 -          FileSystem oldFs = oldPath.getFileSystem(conf);
 -          statuses = oldFs.listStatus(oldPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
 -          // Do not delete oldPath if:
 -          //  - destf is subdir of oldPath
 -          isOldPathUnderDestf = isSubDir(oldPath, destf, oldFs, destFs, false);
 -          if (isOldPathUnderDestf) {
 -            // if oldPath is destf or its subdir, its should definitely be deleted, otherwise its
 -            // existing content might result in incorrect (extra) data.
 -            // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is
 -            // not the destf or its subdir?
 -            oldPathDeleted = trashFiles(oldFs, statuses, conf, purge);
 -          }
 -        } catch (IOException e) {
 -          if (isOldPathUnderDestf) {
 -            // if oldPath is a subdir of destf but it could not be cleaned
 -            throw new HiveException("Directory " + oldPath.toString()
 -                + " could not be cleaned up.", e);
 -          } else {
 -            //swallow the exception since it won't affect the final result
 -            LOG.warn("Directory " + oldPath.toString() + " cannot be cleaned: " + e, e);
 -          }
 -        }
 -        if (statuses != null && statuses.length > 0) {
 -          if (isOldPathUnderDestf && !oldPathDeleted) {
 -            throw new HiveException("Destination directory " + destf + " has not be cleaned up.");
 -          }
 -        }
 +        // TODO: we assume lbLevels is 0 here. Same as old code for non-MM.
 +        //       For MM tables, this can only be a LOAD command. Does LOAD even support LB?
-         deleteOldPathForReplace(destf, oldPath, conf, deletePathFilter, isMmTable, 0);
++        deleteOldPathForReplace(destf, oldPath, conf, purge, deletePathFilter, isMmTable, 0);
        }
  
        // first call FileUtils.mkdir to make sure that destf directory exists, if not, it creates
@@@ -3631,69 -3557,6 +3700,69 @@@
      }
    }
  
-   private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf,
++  private void deleteOldPathForReplace(Path destPath, Path oldPath, HiveConf conf, boolean purge,
 +      PathFilter pathFilter, boolean isMmTable, int lbLevels) throws HiveException {
 +    Utilities.LOG14535.info("Deleting old paths for replace in " + destPath + " and old path " + oldPath);
 +    boolean isOldPathUnderDestf = false;
 +    try {
 +      FileSystem oldFs = oldPath.getFileSystem(conf);
 +      FileSystem destFs = destPath.getFileSystem(conf);
 +      // if oldPath is destf or its subdir, its should definitely be deleted, otherwise its
 +      // existing content might result in incorrect (extra) data.
 +      // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is
 +      // not the destf or its subdir?
 +      isOldPathUnderDestf = isSubDir(oldPath, destPath, oldFs, destFs, false);
 +      if (isOldPathUnderDestf || isMmTable) {
 +        if (lbLevels == 0 || !isMmTable) {
-           cleanUpOneDirectoryForReplace(oldPath, oldFs, pathFilter, conf);
++          cleanUpOneDirectoryForReplace(oldPath, oldFs, pathFilter, conf, purge);
 +        } else {
 +          // We need to clean up different MM IDs from each LB directory separately.
 +          // Avoid temporary directories in the immediate table/part dir.
 +          // TODO: we could just find directories with any MM directories inside?
 +          //       the rest doesn't have to be cleaned up.
 +          String mask = "[^._]*";
 +          for (int i = 0; i < lbLevels - 1; ++i) {
 +            mask += Path.SEPARATOR + "*";
 +          }
 +          Path glob = new Path(oldPath, mask);
 +          FileStatus[] lbDirs = oldFs.globStatus(glob);
 +          for (FileStatus lbDir : lbDirs) {
 +            Path lbPath = lbDir.getPath();
 +            if (!lbDir.isDirectory()) {
 +              throw new HiveException("Unexpected path during overwrite: " + lbPath);
 +            }
 +            Utilities.LOG14535.info("Cleaning up LB directory " + lbPath);
-             cleanUpOneDirectoryForReplace(lbPath, oldFs, pathFilter, conf);
++            cleanUpOneDirectoryForReplace(lbPath, oldFs, pathFilter, conf, purge);
 +          }
 +        }
 +      }
 +    } catch (IOException e) {
 +      if (isOldPathUnderDestf || isMmTable) {
 +        // if oldPath is a subdir of destf but it could not be cleaned
 +        throw new HiveException("Directory " + oldPath.toString()
 +            + " could not be cleaned up.", e);
 +      } else {
 +        //swallow the exception since it won't affect the final result
 +        LOG.warn("Directory " + oldPath.toString() + " cannot be cleaned: " + e, e);
 +      }
 +    }
 +  }
 +
 +
 +  private void cleanUpOneDirectoryForReplace(Path path, FileSystem fs,
-       PathFilter pathFilter, HiveConf conf) throws IOException, HiveException {
++      PathFilter pathFilter, HiveConf conf, boolean purge) throws IOException, HiveException {
 +    FileStatus[] statuses = fs.listStatus(path, pathFilter);
 +    if (statuses == null || statuses.length == 0) return;
 +    String s = "Deleting files under " + path + " for replace: ";
 +    for (FileStatus file : statuses) {
 +      s += file.getPath().getName() + ", ";
 +    }
 +    Utilities.LOG14535.info(s);
-     if (!trashFiles(fs, statuses, conf)) {
++    if (!trashFiles(fs, statuses, conf, purge)) {
 +      throw new HiveException("Old path " + path + " has not been cleaned up.");
 +    }
 +  }
 +
  
    /**
     * Trashes or deletes all files under a directory. Leaves the directory as is.

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index f4fe6ac,dc86942..99a7392
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@@ -63,9 -58,9 +62,11 @@@ import org.apache.hadoop.hive.ql.metada
  import org.apache.hadoop.hive.ql.metadata.HiveException;
  import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
  import org.apache.hadoop.hive.ql.metadata.Table;
+ import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
  import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 +import org.apache.hadoop.hive.ql.plan.CopyWork;
 +import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
+ import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
  import org.apache.hadoop.hive.ql.plan.DDLWork;
  import org.apache.hadoop.hive.ql.plan.DropTableDesc;
  import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
@@@ -217,8 -211,12 +218,13 @@@ public class ImportSemanticAnalyzer ext
  
      // Create table associated with the import
      // Executed if relevant, and used to contain all the other details about the table if not.
-     CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname,rv.getTable());
+     ImportTableDesc tblDesc;
+     try {
+       tblDesc = getBaseCreateTableDescFromTable(dbname, rv.getTable());
+     } catch (Exception e) {
+       throw new HiveException(e);
+     }
 +    boolean isSourceMm = MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps());
  
      if ((replicationSpec!= null) && replicationSpec.isInReplicationScope()){
        tblDesc.setReplicationSpec(replicationSpec);
@@@ -297,18 -292,7 +303,22 @@@
        tableExists = true;
      }
  
 -    if (!replicationSpec.isInReplicationScope()){
 +    Long mmWriteId = null;
 +    if (table != null && MetaStoreUtils.isInsertOnlyTable(table.getParameters())) {
 +      mmWriteId = x.getHive().getNextTableWriteId(table.getDbName(), table.getTableName());
 +    } else if (table == null && isSourceMm) {
 +      // We could import everything as is - directories and IDs, but that won't work with ACID
 +      // txn ids in future. So, let's import everything into the new MM directory with ID == 0.
 +      mmWriteId = 0l;
 +    }
++    //todo due to master merge on May 4, tblDesc has been changed from CreateTableDesc to ImportTableDesc
++    // which may result in Import test failure
++    /*
 +    if (mmWriteId != null) {
 +      tblDesc.setInitialMmWriteId(mmWriteId);
 +    }
++    */
 +    if (!replicationSpec.isInReplicationScope()) {
        createRegularImportTasks(
            tblDesc, partitionDescs,
            isPartSpecSet, replicationSpec, table,
@@@ -455,10 -386,9 +431,10 @@@
      ), x.getConf());
    }
  
-  private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, CreateTableDesc tblDesc,
+  private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc,
 -      Table table, Warehouse wh,
 -      AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x)
 +      Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec,
 +      EximUtil.SemanticAnalyzerWrapperContext x, Long mmWriteId, boolean isSourceMm,
 +      Task<?> commitTask)
        throws MetaException, IOException, HiveException {
      AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
      if (tblDesc.isExternal() && tblDesc.getLocation() == null) {
@@@ -476,39 -405,18 +452,39 @@@
            + partSpecToString(partSpec.getPartSpec())
            + " with source location: " + srcLocation);
        Path tgtLocation = new Path(partSpec.getLocation());
 -      Path tmpPath = x.getCtx().getExternalTmpPath(tgtLocation);
 -      Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
 -          replicationSpec, new Path(srcLocation), tmpPath, x.getConf());
 +      Path destPath = mmWriteId == null ? x.getCtx().getExternalTmpPath(tgtLocation)
 +          : new Path(tgtLocation, ValidWriteIds.getMmFilePrefix(mmWriteId));
 +      Path moveTaskSrc =  mmWriteId == null ? destPath : tgtLocation;
 +      Utilities.LOG14535.info("adding import work for partition with source location: "
 +          + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm "
 +          + mmWriteId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec()));
 +
 +
 +      Task<?> copyTask = null;
 +      if (replicationSpec.isInReplicationScope()) {
 +        if (isSourceMm || mmWriteId != null) {
 +          // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed.
 +          throw new RuntimeException(
 +              "Not supported right now because Replication is completely screwed");
 +        }
 +        copyTask = ReplCopyTask.getLoadCopyTask(
 +            replicationSpec, new Path(srcLocation), destPath, x.getConf());
 +      } else {
 +        CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false);
 +        cw.setSkipSourceMmDirs(isSourceMm);
 +        copyTask = TaskFactory.get(cw, x.getConf());
 +      }
 +
        Task<?> addPartTask = TaskFactory.get(new DDLWork(x.getInputs(),
            x.getOutputs(), addPartitionDesc), x.getConf());
 -      LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
 -          Utilities.getTableDesc(table),
 -          partSpec.getPartSpec(), replicationSpec.isReplace());
 +      LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table),
-           partSpec.getPartSpec(), true, mmWriteId);
++          partSpec.getPartSpec(), replicationSpec.isReplace(), mmWriteId);
        loadTableWork.setInheritTableSpecs(false);
 +      // Do not commit the write ID from each task; need to commit once.
 +      // TODO: we should just change the import to use a single MoveTask, like dynparts.
 +      loadTableWork.setIntermediateInMmWrite(mmWriteId != null);
        Task<?> loadPartTask = TaskFactory.get(new MoveWork(
 -          x.getInputs(), x.getOutputs(), loadTableWork, null, false),
 -          x.getConf());
 +          x.getInputs(), x.getOutputs(), loadTableWork, null, false), x.getConf());
        copyTask.addDependentTask(loadPartTask);
        addPartTask.addDependentTask(loadPartTask);
        x.getTasks().add(copyTask);
@@@ -800,16 -707,16 +776,16 @@@
     * @param wh
     */
    private static void createRegularImportTasks(
-       CreateTableDesc tblDesc, List<AddPartitionDesc> partitionDescs, boolean isPartSpecSet,
 -      ImportTableDesc tblDesc,
 -      List<AddPartitionDesc> partitionDescs,
 -      boolean isPartSpecSet,
 -      ReplicationSpec replicationSpec,
 -      Table table, URI fromURI, FileSystem fs, Warehouse wh, EximUtil.SemanticAnalyzerWrapperContext x)
++      ImportTableDesc tblDesc, List<AddPartitionDesc> partitionDescs, boolean isPartSpecSet,
 +      ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh,
 +      EximUtil.SemanticAnalyzerWrapperContext x, Long mmWriteId, boolean isSourceMm)
        throws HiveException, URISyntaxException, IOException, MetaException {
  
 -    if (table != null){
 +    if (table != null) {
        if (table.isPartitioned()) {
          x.getLOG().debug("table partitioned");
 +        Task<?> ict = createImportCommitTask(
 +            table.getDbName(), table.getTableName(), mmWriteId, x.getConf());
  
          for (AddPartitionDesc addPartitionDesc : partitionDescs) {
            Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
@@@ -1026,8 -920,7 +1001,8 @@@
          }
          if (!replicationSpec.isMetadataOnly()) {
            // repl-imports are replace-into unless the event is insert-into
-           loadTable(fromURI, table, !replicationSpec.isInsert(), new Path(fromURI),
 -          loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI), replicationSpec, x);
++          loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI),
 +            replicationSpec, x, mmWriteId, isSourceMm);
          } else {
            x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));
          }

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 34f2ac4,5115fc8..1bd4f26
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@@ -7158,14 -7259,22 +7249,14 @@@ public class SemanticAnalyzer extends B
      } else if (dpCtx != null) {
        fileSinkDesc.setStaticSpec(dpCtx.getSPPath());
      }
 +    return fileSinkDesc;
 +  }
  
 -    if (isHiveServerQuery &&
 -      null != table_desc &&
 -      table_desc.getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()) &&
 -      HiveConf.getBoolVar(conf,HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) {
 -        fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(true);
 -    } else {
 -        fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(false);
 -    }
 -
 -    Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
 -        fileSinkDesc, fsRS, input), inputRR);
 -
 +  private void handleLineage(LoadTableDesc ltd, Operator output)
 +      throws SemanticException {
      if (ltd != null && SessionState.get() != null) {
        SessionState.get().getLineageState()
-           .mapDirToOp(ltd.getSourcePath(), (FileSinkOperator) output);
+           .mapDirToOp(ltd.getSourcePath(), output);
      } else if ( queryState.getCommandType().equals(HiveOperation.CREATETABLE_AS_SELECT.getOperationName())) {
  
        Path tlocation = null;
@@@ -7178,51 -7287,30 +7269,51 @@@
        }
  
        SessionState.get().getLineageState()
-               .mapDirToOp(tlocation, (FileSinkOperator) output);
+               .mapDirToOp(tlocation, output);
      }
 +  }
  
 -    if (LOG.isDebugEnabled()) {
 -      LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
 -          + dest_path + " row schema: " + inputRR.toString());
 +  private WriteEntity generateTableWriteEntity(String dest, Table dest_tab,
 +      Map<String, String> partSpec, LoadTableDesc ltd,
 +      DynamicPartitionCtx dpCtx, boolean isNonNativeTable)
 +      throws SemanticException {
 +    WriteEntity output = null;
 +
 +    // Here only register the whole table for post-exec hook if no DP present
 +    // in the case of DP, we will register WriteEntity in MoveTask when the
 +    // list of dynamically created partitions are known.
 +    if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
 +      output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest));
 +      if (!outputs.add(output)) {
 +        throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
 +            .getMsg(dest_tab.getTableName()));
 +      }
      }
  
 -    FileSinkOperator fso = (FileSinkOperator) output;
 -    fso.getConf().setTable(dest_tab);
 -    fsopToTable.put(fso, dest_tab);
 -    // the following code is used to collect column stats when
 -    // hive.stats.autogather=true
 -    // and it is an insert overwrite or insert into table
 -    if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)
 -        && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER)
 -        && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) {
 -      if (dest_type.intValue() == QBMetaData.DEST_TABLE) {
 -        genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo()
 -            .isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
 -      } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) {
 -        genAutoColumnStatsGatheringPipeline(qb, table_desc, dest_part.getSpec(), input, qb
 -            .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName()));
 -
 +    if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
 +      // No static partition specified
 +      if (dpCtx.getNumSPCols() == 0) {
 +        output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest), false);
 +        outputs.add(output);
 +        output.setDynamicPartitionWrite(true);
 +      }
 +      // part of the partition specified
 +      // Create a DummyPartition in this case. Since, the metastore does not store partial
 +      // partitions currently, we need to store dummy partitions
 +      else {
 +        try {
 +          String ppath = dpCtx.getSPPath();
 +          ppath = ppath.substring(0, ppath.length() - 1);
 +          DummyPartition p =
 +              new DummyPartition(dest_tab, dest_tab.getDbName()
 +                  + "@" + dest_tab.getTableName() + "@" + ppath,
 +                  partSpec);
 +          output = new WriteEntity(p, getWriteType(dest), false);
 +          output.setDynamicPartitionWrite(true);
 +          outputs.add(output);
 +        } catch (HiveException e) {
 +          throw new SemanticException(e.getMessage(), e);
 +        }
        }
      }
      return output;
@@@ -7329,11 -7357,11 +7420,11 @@@
    // This method assumes you have already decided that this is an Acid write.  Don't call it if
    // that isn't true.
    private void checkAcidConstraints(QB qb, TableDesc tableDesc,
 -                                    Table table) throws SemanticException {
 +                                    Table table, AcidUtils.Operation acidOp) throws SemanticException {
      String tableName = tableDesc.getTableName();
 -    if (!qb.getParseInfo().isInsertIntoTable(tableName)) {
 +    if (!qb.getParseInfo().isInsertIntoTable(tableName) && !Operation.INSERT_ONLY.equals(acidOp)) {
        LOG.debug("Couldn't find table " + tableName + " in insertIntoTable");
-       throw new SemanticException(ErrorMsg.NO_INSERT_OVERWRITE_WITH_ACID.getMsg());
+       throw new SemanticException(ErrorMsg.NO_INSERT_OVERWRITE_WITH_ACID, tableName);
      }
      /*
      LOG.info("Modifying config values for ACID write");

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 91c343c,08a8f00..6629a0c
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@@ -316,80 -364,6 +316,80 @@@ public abstract class TaskCompiler 
      }
    }
  
 +  private void setLoadFileLocation(
 +      final ParseContext pCtx, LoadFileDesc lfd) throws SemanticException {
 +    // CTAS; make the movetask's destination directory the table's destination.
 +    Long mmWriteIdForCtas = null;
 +    FileSinkDesc dataSinkForCtas = null;
 +    String loc = null;
 +    if (pCtx.getQueryProperties().isCTAS()) {
 +      CreateTableDesc ctd = pCtx.getCreateTable();
 +      dataSinkForCtas = ctd.getAndUnsetWriter();
 +      mmWriteIdForCtas = ctd.getInitialMmWriteId();
 +      loc = ctd.getLocation();
 +    } else {
 +      loc = pCtx.getCreateViewDesc().getLocation();
 +    }
 +    Path location = (loc == null) ? getDefaultCtasLocation(pCtx) : new Path(loc);
 +    if (mmWriteIdForCtas != null) {
 +      dataSinkForCtas.setDirName(location);
 +      location = new Path(location, ValidWriteIds.getMmFilePrefix(mmWriteIdForCtas));
 +      lfd.setSourcePath(location);
 +      Utilities.LOG14535.info("Setting MM CTAS to  " + location);
 +    }
 +    Utilities.LOG14535.info("Location for LFD is being set to " + location + "; moving from " + lfd.getSourcePath());
 +    lfd.setTargetDir(location);
 +  }
 +
 +  private void createColumnStatsTasks(final ParseContext pCtx,
 +      final List<Task<? extends Serializable>> rootTasks,
 +      List<LoadFileDesc> loadFileWork, boolean isCStats, int outerQueryLimit)
 +      throws SemanticException {
 +    Set<Task<? extends Serializable>> leafTasks = new LinkedHashSet<Task<? extends Serializable>>();
 +    getLeafTasks(rootTasks, leafTasks);
 +    if (isCStats) {
 +      genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadFileWork, leafTasks, outerQueryLimit, 0);
 +    } else {
 +      for (ColumnStatsAutoGatherContext columnStatsAutoGatherContext : pCtx
 +          .getColumnStatsAutoGatherContexts()) {
 +        if (!columnStatsAutoGatherContext.isInsertInto()) {
 +          genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
 +              columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, outerQueryLimit, 0);
 +        } else {
 +          int numBitVector;
 +          try {
 +            numBitVector = HiveStatsUtils.getNumBitVectorsForNDVEstimation(conf);
 +          } catch (Exception e) {
 +            throw new SemanticException(e.getMessage());
 +          }
 +          genColumnStatsTask(columnStatsAutoGatherContext.getAnalyzeRewrite(),
 +              columnStatsAutoGatherContext.getLoadFileWork(), leafTasks, outerQueryLimit, numBitVector);
 +        }
 +      }
 +    }
 +  }
 +
 +  private Path getDefaultCtasLocation(final ParseContext pCtx) throws SemanticException {
 +    try {
 +      String protoName = null;
 +      if (pCtx.getQueryProperties().isCTAS()) {
 +        protoName = pCtx.getCreateTable().getTableName();
 +      } else if (pCtx.getQueryProperties().isMaterializedView()) {
 +        protoName = pCtx.getCreateViewDesc().getViewName();
 +      }
 +      String[] names = Utilities.getDbTableName(protoName);
 +      if (!db.databaseExists(names[0])) {
 +        throw new SemanticException("ERROR: The database " + names[0] + " does not exist.");
 +      }
 +      Warehouse wh = new Warehouse(conf);
-       return wh.getTablePath(db.getDatabase(names[0]), names[1]);
++      return wh.getDefaultTablePath(db.getDatabase(names[0]), names[1]);
 +    } catch (HiveException e) {
 +      throw new SemanticException(e);
 +    } catch (MetaException e) {
 +      throw new SemanticException(e);
 +    }
 +  }
 +
    private void patchUpAfterCTASorMaterializedView(final List<Task<? extends Serializable>>  rootTasks,
                                                    final HashSet<WriteEntity> outputs,
                                                    Task<? extends Serializable> createTask) {

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/1ceaf357/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java
----------------------------------------------------------------------


Mime
View raw message