hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [49/50] [abbrv] hive git commit: HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
Date Wed, 27 Sep 2017 20:23:24 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index b000745,c8ad795..ed527b8
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@@ -392,12 -435,110 +390,12 @@@ public class MoveTask extends Task<Move
            // deal with dynamic partitions
            DynamicPartitionCtx dpCtx = tbd.getDPCtx();
            if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
 -
 -            List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, dpCtx);
 -
 -            console.printInfo(System.getProperty("line.separator"));
 -            long startTime = System.currentTimeMillis();
 -            // load the list of DP partitions and return the list of partition specs
 -            // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions
 -            // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
 -            // After that check the number of DPs created to not exceed the limit and
 -            // iterate over it and call loadPartition() here.
 -            // The reason we don't do inside HIVE-1361 is the latter is large and we
 -            // want to isolate any potential issue it may introduce.
 -            Map<Map<String, String>, Partition> dp =
 -              db.loadDynamicPartitions(
 -                tbd.getSourcePath(),
 -                tbd.getTable().getTableName(),
 -                tbd.getPartitionSpec(),
 -                tbd.getReplace(),
 -                dpCtx.getNumDPCols(),
 -                isSkewedStoredAsDirs(tbd),
 -                work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID,
 -                work.getLoadTableWork().getCurrentTransactionId(), hasFollowingStatsTask(),
 -                work.getLoadTableWork().getWriteType());
 -
 -            // publish DP columns to its subscribers
 -            if (dps != null && dps.size() > 0) {
 -              pushFeed(FeedType.DYNAMIC_PARTITIONS, dp.values());
 -            }
 -
 -            String loadTime = "\t Time taken to load dynamic partitions: "  +
 -                (System.currentTimeMillis() - startTime)/1000.0 + " seconds";
 -            console.printInfo(loadTime);
 -            LOG.info(loadTime);
 -
 -            if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
 -              throw new HiveException("This query creates no partitions." +
 -                  " To turn off this error, set hive.error.on.empty.partition=false.");
 -            }
 -
 -            startTime = System.currentTimeMillis();
 -            // for each partition spec, get the partition
 -            // and put it to WriteEntity for post-exec hook
 -            for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
 -              Partition partn = entry.getValue();
 -
 -              if (bucketCols != null || sortCols != null) {
 -                updatePartitionBucketSortColumns(
 -                    db, table, partn, bucketCols, numBuckets, sortCols);
 -              }
 -
 -              WriteEntity enty = new WriteEntity(partn,
 -                getWriteType(tbd, work.getLoadTableWork().getWriteType()));
 -              if (work.getOutputs() != null) {
 -                DDLTask.addIfAbsentByName(enty, work.getOutputs());
 -              }
 -              // Need to update the queryPlan's output as well so that post-exec hook get executed.
 -              // This is only needed for dynamic partitioning since for SP the the WriteEntity is
 -              // constructed at compile time and the queryPlan already contains that.
 -              // For DP, WriteEntity creation is deferred at this stage so we need to update
 -              // queryPlan here.
 -              if (queryPlan.getOutputs() == null) {
 -                queryPlan.setOutputs(new LinkedHashSet<WriteEntity>());
 -              }
 -              queryPlan.getOutputs().add(enty);
 -
 -              // update columnar lineage for each partition
 -              dc = new DataContainer(table.getTTable(), partn.getTPartition());
 -
 -              // Don't set lineage on delete as we don't have all the columns
 -              if (work.getLineagState() != null &&
 -                  work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE &&
 -                  work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) {
 -                work.getLineagState().setLineage(tbd.getSourcePath(), dc,
 -                    table.getCols());
 -              }
 -              LOG.info("\tLoading partition " + entry.getKey());
 -            }
 -            console.printInfo("\t Time taken for adding to write entity : " +
 -                (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
 -            dc = null; // reset data container to prevent it being added again.
 +            dc = handleDynParts(db, table, tbd, ti, dpCtx);
            } else { // static partitions
 -            List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),
 -                tbd.getPartitionSpec());
 -            db.validatePartitionNameCharacters(partVals);
 -            db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
 -                tbd.getPartitionSpec(), tbd.getReplace(),
 -                tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(),
 -                work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask());
 -            Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
 -
 -            if (bucketCols != null || sortCols != null) {
 -              updatePartitionBucketSortColumns(db, table, partn, bucketCols,
 -                  numBuckets, sortCols);
 -            }
 -
 -            dc = new DataContainer(table.getTTable(), partn.getTPartition());
 -            // add this partition to post-execution hook
 -            if (work.getOutputs() != null) {
 -              DDLTask.addIfAbsentByName(new WriteEntity(partn,
 -                getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());
 -            }
 -         }
 +            dc = handleStaticParts(db, table, tbd, ti);
 +          }
          }
-         if (SessionState.get() != null && dc != null) {
+         if (work.getLineagState() != null && dc != null) {
            // If we are doing an update or a delete the number of columns in the table will not
            // match the number of columns in the file sink.  For update there will be one too many
            // (because of the ROW__ID), and in the case of the delete there will be just the
@@@ -445,231 -586,6 +443,231 @@@
        return (1);
      }
    }
 +
 +  private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd,
 +      TaskInformation ti) throws HiveException, IOException, InvalidOperationException {
 +    List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),  tbd.getPartitionSpec());
 +    db.validatePartitionNameCharacters(partVals);
 +    Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath()
 +        + " into " + tbd.getTable().getTableName());
 +    db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
 +        tbd.getPartitionSpec(), tbd.getReplace(),
 +        tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(),
 +        work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID &&
 +            !tbd.isMmTable(),
 +        hasFollowingStatsTask(), tbd.getTxnId(), tbd.getStmtId());
 +    Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
 +
 +    // See the comment inside updatePartitionBucketSortColumns.
 +    if (!tbd.isMmTable() && (ti.bucketCols != null || ti.sortCols != null)) {
 +      updatePartitionBucketSortColumns(db, table, partn, ti.bucketCols,
 +          ti.numBuckets, ti.sortCols);
 +    }
 +
 +    DataContainer dc = new DataContainer(table.getTTable(), partn.getTPartition());
 +    // add this partition to post-execution hook
 +    if (work.getOutputs() != null) {
 +      DDLTask.addIfAbsentByName(new WriteEntity(partn,
 +        getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs());
 +    }
 +    return dc;
 +  }
 +
 +  private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd,
 +      TaskInformation ti, DynamicPartitionCtx dpCtx) throws HiveException,
 +      IOException, InvalidOperationException {
 +    DataContainer dc;
 +    List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, dpCtx);
 +
 +    console.printInfo(System.getProperty("line.separator"));
 +    long startTime = System.currentTimeMillis();
 +    // load the list of DP partitions and return the list of partition specs
 +    // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions
 +    // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
 +    // After that check the number of DPs created to not exceed the limit and
 +    // iterate over it and call loadPartition() here.
 +    // The reason we don't do inside HIVE-1361 is the latter is large and we
 +    // want to isolate any potential issue it may introduce.
 +    if (tbd.isMmTable() && !tbd.isCommitMmWrite()) {
 +      throw new HiveException("Only single-partition LoadTableDesc can skip commiting write ID");
 +    }
 +    Map<Map<String, String>, Partition> dp =
 +      db.loadDynamicPartitions(
 +        tbd.getSourcePath(),
 +        tbd.getTable().getTableName(),
 +        tbd.getPartitionSpec(),
 +        tbd.getReplace(),
 +        dpCtx.getNumDPCols(),
 +        (tbd.getLbCtx() == null) ? 0 : tbd.getLbCtx().calculateListBucketingLevel(),
 +        work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID &&
 +            !tbd.isMmTable(),
-         SessionState.get().getTxnMgr().getCurrentTxnId(), tbd.getStmtId(), hasFollowingStatsTask(),
++        work.getLoadTableWork().getTxnId(), tbd.getStmtId(), hasFollowingStatsTask(),
 +        work.getLoadTableWork().getWriteType());
 +
 +    // publish DP columns to its subscribers
 +    if (dps != null && dps.size() > 0) {
 +      pushFeed(FeedType.DYNAMIC_PARTITIONS, dp.values());
 +    }
 +
 +    String loadTime = "\t Time taken to load dynamic partitions: "  +
 +        (System.currentTimeMillis() - startTime)/1000.0 + " seconds";
 +    console.printInfo(loadTime);
 +    LOG.info(loadTime);
 +
 +    if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
 +      throw new HiveException("This query creates no partitions." +
 +          " To turn off this error, set hive.error.on.empty.partition=false.");
 +    }
 +
 +    startTime = System.currentTimeMillis();
 +    // for each partition spec, get the partition
 +    // and put it to WriteEntity for post-exec hook
 +    for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
 +      Partition partn = entry.getValue();
 +
 +      // See the comment inside updatePartitionBucketSortColumns.
 +      if (!tbd.isMmTable() && (ti.bucketCols != null || ti.sortCols != null)) {
 +        updatePartitionBucketSortColumns(
 +            db, table, partn, ti.bucketCols, ti.numBuckets, ti.sortCols);
 +      }
 +
 +      WriteEntity enty = new WriteEntity(partn,
 +        getWriteType(tbd, work.getLoadTableWork().getWriteType()));
 +      if (work.getOutputs() != null) {
 +        DDLTask.addIfAbsentByName(enty, work.getOutputs());
 +      }
 +      // Need to update the queryPlan's output as well so that post-exec hook get executed.
 +      // This is only needed for dynamic partitioning since for SP the the WriteEntity is
 +      // constructed at compile time and the queryPlan already contains that.
 +      // For DP, WriteEntity creation is deferred at this stage so we need to update
 +      // queryPlan here.
 +      if (queryPlan.getOutputs() == null) {
 +        queryPlan.setOutputs(new LinkedHashSet<WriteEntity>());
 +      }
 +      queryPlan.getOutputs().add(enty);
 +
 +      // update columnar lineage for each partition
 +      dc = new DataContainer(table.getTTable(), partn.getTPartition());
 +
 +      // Don't set lineage on delete as we don't have all the columns
-       if (SessionState.get() != null &&
++      if (work.getLineagState() != null &&
 +          work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE &&
 +          work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) {
-         SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
++        work.getLineagState().setLineage(tbd.getSourcePath(), dc,
 +            table.getCols());
 +      }
 +      LOG.info("\tLoading partition " + entry.getKey());
 +    }
 +    console.printInfo("\t Time taken for adding to write entity : " +
 +        (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
 +    dc = null; // reset data container to prevent it being added again.
 +    return dc;
 +  }
 +
 +  private void inferTaskInformation(TaskInformation ti) {
 +    // Find the first ancestor of this MoveTask which is some form of map reduce task
 +    // (Either standard, local, or a merge)
 +    while (ti.task.getParentTasks() != null && ti.task.getParentTasks().size() == 1) {
 +      ti.task = (Task)ti.task.getParentTasks().get(0);
 +      // If it was a merge task or a local map reduce task, nothing can be inferred
 +      if (ti.task instanceof MergeFileTask || ti.task instanceof MapredLocalTask) {
 +        break;
 +      }
 +
 +      // If it's a standard map reduce task, check what, if anything, it inferred about
 +      // the directory this move task is moving
 +      if (ti.task instanceof MapRedTask) {
 +        MapredWork work = (MapredWork)ti.task.getWork();
 +        MapWork mapWork = work.getMapWork();
 +        ti.bucketCols = mapWork.getBucketedColsByDirectory().get(ti.path);
 +        ti.sortCols = mapWork.getSortedColsByDirectory().get(ti.path);
 +        if (work.getReduceWork() != null) {
 +          ti.numBuckets = work.getReduceWork().getNumReduceTasks();
 +        }
 +
 +        if (ti.bucketCols != null || ti.sortCols != null) {
 +          // This must be a final map reduce task (the task containing the file sink
 +          // operator that writes the final output)
 +          assert work.isFinalMapRed();
 +        }
 +        break;
 +      }
 +
 +      // If it's a move task, get the path the files were moved from, this is what any
 +      // preceding map reduce task inferred information about, and moving does not invalidate
 +      // those assumptions
 +      // This can happen when a conditional merge is added before the final MoveTask, but the
 +      // condition for merging is not met, see GenMRFileSink1.
 +      if (ti.task instanceof MoveTask) {
 +        MoveTask mt = (MoveTask)ti.task;
 +        if (mt.getWork().getLoadFileWork() != null) {
 +          ti.path = mt.getWork().getLoadFileWork().getSourcePath().toUri().toString();
 +        }
 +      }
 +    }
 +  }
 +
 +  private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table)
 +      throws HiveException {
 +    if (work.getCheckFileFormat()) {
 +      // Get all files from the src directory
 +      FileStatus[] dirs;
 +      ArrayList<FileStatus> files;
 +      FileSystem srcFs; // source filesystem
 +      try {
 +        srcFs = tbd.getSourcePath().getFileSystem(conf);
 +        dirs = srcFs.globStatus(tbd.getSourcePath());
 +        files = new ArrayList<FileStatus>();
 +        for (int i = 0; (dirs != null && i < dirs.length); i++) {
 +          files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER)));
 +          // We only check one file, so exit the loop when we have at least
 +          // one.
 +          if (files.size() > 0) {
 +            break;
 +          }
 +        }
 +      } catch (IOException e) {
 +        throw new HiveException(
 +            "addFiles: filesystem error in check phase", e);
 +      }
 +
 +      // handle file format check for table level
 +      if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
 +        boolean flag = true;
 +        // work.checkFileFormat is set to true only for Load Task, so assumption here is
 +        // dynamic partition context is null
 +        if (tbd.getDPCtx() == null) {
 +          if (tbd.getPartitionSpec() == null || tbd.getPartitionSpec().isEmpty()) {
 +            // Check if the file format of the file matches that of the table.
 +            flag = HiveFileFormatUtils.checkInputFormat(
 +                srcFs, conf, tbd.getTable().getInputFileFormatClass(), files);
 +          } else {
 +            // Check if the file format of the file matches that of the partition
 +            Partition oldPart = db.getPartition(table, tbd.getPartitionSpec(), false);
 +            if (oldPart == null) {
 +              // this means we have just created a table and are specifying partition in the
 +              // load statement (without pre-creating the partition), in which case lets use
 +              // table input format class. inheritTableSpecs defaults to true so when a new
 +              // partition is created later it will automatically inherit input format
 +              // from table object
 +              flag = HiveFileFormatUtils.checkInputFormat(
 +                  srcFs, conf, tbd.getTable().getInputFileFormatClass(), files);
 +            } else {
 +              flag = HiveFileFormatUtils.checkInputFormat(
 +                  srcFs, conf, oldPart.getInputFormatClass(), files);
 +            }
 +          }
 +          if (!flag) {
 +            throw new HiveException(ErrorMsg.WRONG_FILE_FORMAT);
 +          }
 +        } else {
 +          LOG.warn("Skipping file format check as dpCtx is not null");
 +        }
 +      }
 +    }
 +  }
 +
 +
    /**
     * so to make sure we crate WriteEntity with the right WriteType.  This is (at this point) only
     * for consistency since LockManager (which is the only thing that pays attention to WriteType)

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 05698d1,5c6ef9f..51c4090
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@@ -240,10 -238,10 +240,11 @@@ public class LoadPartitions 
        Path tmpPath) {
      LoadTableDesc loadTableWork = new LoadTableDesc(
          tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
 -        event.replicationSpec().isReplace());
 +        event.replicationSpec().isReplace(), SessionState.get().getTxnMgr().getCurrentTxnId()
 +    );
      loadTableWork.setInheritTableSpecs(false);
-     MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false);
+     MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false,
+         context.sessionStateLineageState);
      return TaskFactory.get(work, context.hiveConf);
    }
  

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index afc04a3,a9a9162..65a3a59
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@@ -225,11 -224,10 +225,12 @@@ public class LoadTable 
          ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf);
  
      LoadTableDesc loadTableWork = new LoadTableDesc(
 -        tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace());
 +        tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace(),
 +        SessionState.get().getTxnMgr().getCurrentTxnId()
 +    );
      MoveWork moveWork =
-         new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false);
+         new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false,
+             context.sessionStateLineageState);
      Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf);
      copyTask.addDependentTask(loadTableTask);
      return copyTask;

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 6dfa607,25cca7b..6a56407
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@@ -1364,29 -1353,9 +1364,31 @@@ public final class GenMapRedUtils 
      cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
      // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
      // know if merge MR2 will be triggered at execution time
 -    Task<MoveWork> mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOutput);
 +    MoveWork dummyMv = null;
 +    if (srcMmWriteId == null) {
 +      // Only create the movework for non-MM table. No action needed for a MM table.
 +      Utilities.LOG14535.info("creating dummy movetask for merge (with lfd)");
 +      dummyMv = new MoveWork(null, null, null,
-          new LoadFileDesc(inputDirName, finalName, true, null, null, false), false);
++          new LoadFileDesc(inputDirName, finalName, true, null, null, false), false,
++          SessionState.get().getLineageState());
 +    } else {
 +      // TODO# create the noop MoveWork to avoid q file changes for now. Should be removed w/the flag just before merge
 +      dummyMv = new MoveWork(null, null, null,
-           new LoadFileDesc(inputDirName, finalName, true, null, null, false), false);
++          new LoadFileDesc(inputDirName, finalName, true, null, null, false), false,
++          SessionState.get().getLineageState());
 +      dummyMv.setNoop(true);
 +    }
 +    // Use the original fsOp path here in case of MM - while the new FSOP merges files inside the
 +    // MM directory, the original MoveTask still commits based on the parent. Note that this path
 +    // can only be triggered for a merge that's part of insert for now; MM tables do not support
 +    // concatenate. Keeping the old logic for non-MM tables with temp directories and stuff.
 +    Path fsopPath = srcMmWriteId != null ? fsInputDesc.getFinalDirName() : finalName;
 +    Utilities.LOG14535.info("Looking for MoveTask to make it dependant on the conditional tasks");
 +
 +    Task<MoveWork> mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(
 +        mvTasks, fsopPath, fsInputDesc.isMmTable());
      ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work,
 -        fsInputDesc.getFinalDirName(), finalName, mvTask, dependencyTask);
 +        fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask);
  
      // keep the dynamic partition context in conditional task resolver context
      ConditionalResolverMergeFilesCtx mrCtx =

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index fdeeacf,e70a72c..36bb89f
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@@ -1090,10 -1088,11 +1095,11 @@@ public class DDLSemanticAnalyzer extend
          Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc);
          truncateTblDesc.setOutputDir(queryTmpdir);
          LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
-             partSpec == null ? new HashMap<String, String>() : partSpec, null);
 -            partSpec == null ? new HashMap<>() : partSpec);
++            partSpec == null ? new HashMap<>() : partSpec, null);
          ltd.setLbCtx(lbCtx);
 -        Task<MoveWork> moveTsk = TaskFactory
 -            .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()),
 -            conf);
 +        @SuppressWarnings("unchecked")
-         Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
++        Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(
++          null, null, ltd, null, false, SessionState.get().getLineageState()), conf);
          truncateTask.addDependentTask(moveTsk);
  
          // Recalculate the HDFS stats if auto gather stats is set
@@@ -1736,11 -1722,12 +1742,12 @@@
        TableDesc tblDesc = Utilities.getTableDesc(tblObj);
        Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc);
        mergeDesc.setOutputDir(queryTmpdir);
 +      // No need to handle MM tables - unsupported path.
        LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
 -          partSpec == null ? new HashMap<>() : partSpec);
 +          partSpec == null ? new HashMap<>() : partSpec, null);
        ltd.setLbCtx(lbCtx);
-       Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf);
 -      Task<MoveWork> moveTsk = TaskFactory
 -          .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()),
 -          conf);
++      Task<MoveWork> moveTsk = TaskFactory.get(
++        new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), conf);
        mergeTask.addDependentTask(moveTsk);
  
        if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 36c0c98,751bda0..b9f28eb
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@@ -360,36 -343,17 +360,36 @@@ public class ImportSemanticAnalyzer ext
      return tblDesc;
    }
  
 +
    private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
 -                            ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x) {
 +      ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x,
 +      Long txnId, int stmtId, boolean isSourceMm) {
      Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
 -    Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath);
 -    Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, x.getConf());
 -    LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
 -        Utilities.getTableDesc(table), new TreeMap<>(),
 -        replace);
 -    Task<?> loadTableTask = TaskFactory.get(new MoveWork(x.getInputs(),
 -            x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()),
 -        x.getConf());
 +    Path destPath = !MetaStoreUtils.isInsertOnlyTable(table.getParameters()) ? x.getCtx().getExternalTmpPath(tgtPath)
 +        : new Path(tgtPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
 +    Utilities.LOG14535.info("adding import work for table with source location: "
 +        + dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm "
 +        + txnId + " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName()));
 +
 +    Task<?> copyTask = null;
 +    if (replicationSpec.isInReplicationScope()) {
 +      if (isSourceMm || isAcid(txnId)) {
 +        // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed.
 +        throw new RuntimeException("Replicating MM and ACID tables is not supported");
 +      }
 +      copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf());
 +    } else {
 +      CopyWork cw = new CopyWork(dataPath, destPath, false);
 +      cw.setSkipSourceMmDirs(isSourceMm);
 +      copyTask = TaskFactory.get(cw, x.getConf());
 +    }
 +
 +    LoadTableDesc loadTableWork = new LoadTableDesc(destPath,
-         Utilities.getTableDesc(table), new TreeMap<String, String>(), replace, txnId);
++        Utilities.getTableDesc(table), new TreeMap<>(), replace, txnId);
 +    loadTableWork.setTxnId(txnId);
 +    loadTableWork.setStmtId(stmtId);
-     MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false);
++    MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState());
 +    Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
      copyTask.addDependentTask(loadTableTask);
      x.getTasks().add(copyTask);
      return loadTableTask;
@@@ -450,40 -413,19 +455,40 @@@
            + partSpecToString(partSpec.getPartSpec())
            + " with source location: " + srcLocation);
        Path tgtLocation = new Path(partSpec.getLocation());
 -      Path tmpPath = x.getCtx().getExternalTmpPath(tgtLocation);
 -      Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
 -          replicationSpec, new Path(srcLocation), tmpPath, x.getConf());
 +      Path destPath = !MetaStoreUtils.isInsertOnlyTable(table.getParameters()) ? x.getCtx().getExternalTmpPath(tgtLocation)
 +          : new Path(tgtLocation, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
 +      Path moveTaskSrc =  !MetaStoreUtils.isInsertOnlyTable(table.getParameters()) ? destPath : tgtLocation;
 +      Utilities.LOG14535.info("adding import work for partition with source location: "
 +          + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm "
 +          + txnId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec()));
 +
 +
 +      Task<?> copyTask = null;
 +      if (replicationSpec.isInReplicationScope()) {
 +        if (isSourceMm || isAcid(txnId)) {
 +          // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed.
 +          throw new RuntimeException("Replicating MM and ACID tables is not supported");
 +        }
 +        copyTask = ReplCopyTask.getLoadCopyTask(
 +            replicationSpec, new Path(srcLocation), destPath, x.getConf());
 +      } else {
 +        CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false);
 +        cw.setSkipSourceMmDirs(isSourceMm);
 +        copyTask = TaskFactory.get(cw, x.getConf());
 +      }
 +
        Task<?> addPartTask = TaskFactory.get(new DDLWork(x.getInputs(),
            x.getOutputs(), addPartitionDesc), x.getConf());
 -      LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
 -          Utilities.getTableDesc(table),
 -          partSpec.getPartSpec(), replicationSpec.isReplace());
 +      LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table),
 +          partSpec.getPartSpec(), replicationSpec.isReplace(), txnId);
 +      loadTableWork.setTxnId(txnId);
 +      loadTableWork.setStmtId(stmtId);
        loadTableWork.setInheritTableSpecs(false);
 +      // Do not commit the write ID from each task; need to commit once.
 +      // TODO: we should just change the import to use a single MoveTask, like dynparts.
 +      loadTableWork.setIntermediateInMmWrite(isAcid(txnId));
        Task<?> loadPartTask = TaskFactory.get(new MoveWork(
-           x.getInputs(), x.getOutputs(), loadTableWork, null, false), x.getConf());
 -              x.getInputs(), x.getOutputs(), loadTableWork, null, false,
 -              SessionState.get().getLineageState()),
 -          x.getConf());
++          x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()), x.getConf());
        copyTask.addDependentTask(loadPartTask);
        addPartTask.addDependentTask(loadPartTask);
        x.getTasks().add(copyTask);

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 3475c7c,4814fcd..df50aab
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@@ -6858,15 -6902,10 +6861,18 @@@ public class SemanticAnalyzer extends B
            acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
            checkAcidConstraints(qb, table_desc, dest_tab);
          }
 -        Long currentTransactionId = acidOp == Operation.NOT_ACID ? null :
 +        if (MetaStoreUtils.isInsertOnlyTable(table_desc.getProperties())) {
 +          acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
 +        }
 +        if (isMmTable) {
 +          txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
++        } else {
++          txnId = acidOp == Operation.NOT_ACID ? null :
+             SessionState.get().getTxnMgr().getCurrentTxnId();
 -        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp,
 -            currentTransactionId);
 +        }
 +        boolean isReplace = !qb.getParseInfo().isInsertIntoTable(
 +            dest_tab.getDbName(), dest_tab.getTableName());
 +        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, txnId);
          // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old
          // deltas and base and leave them up to the cleaner to clean up
          ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
@@@ -6926,13 -7020,10 +6932,16 @@@
          acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
          checkAcidConstraints(qb, table_desc, dest_tab);
        }
 -      Long currentTransactionId = (acidOp == Operation.NOT_ACID) ? null :
 +      if (MetaStoreUtils.isInsertOnlyTable(dest_part.getTable().getParameters())) {
 +        acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
 +      }
 +      if (isMmTable) {
 +        txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
++      } else {
++        txnId = (acidOp == Operation.NOT_ACID) ? null :
+           SessionState.get().getTxnMgr().getCurrentTxnId();
 -      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp,
 -          currentTransactionId);
 +      }
 +      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, txnId);
        // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old
        // deltas and base and leave them up to the cleaner to clean up
        ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
@@@ -7015,9 -7150,8 +7024,9 @@@
        }
  
        boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE);
 +      // Create LFD even for MM CTAS - it's a no-op move, but it still seems to be used for stats.
        loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, queryTmpdir, dest_path, isDfsDir, cols,
-         colTypes, destTableIsAcid ? Operation.INSERT : Operation.NOT_ACID, isMmCtas));
 -          colTypes, destTableIsAcid ? Operation.INSERT : Operation.NOT_ACID));
++          colTypes, destTableIsAcid ? Operation.INSERT : Operation.NOT_ACID, isMmCtas));
        if (tblDesc == null) {
          if (viewDesc != null) {
            table_desc = PlanUtils.getTableDesc(viewDesc, cols, colTypes);

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 356ab6f,f0089fc..752a934
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@@ -18,17 -18,8 +18,19 @@@
  
  package org.apache.hadoop.hive.ql.parse;
  
 +import java.io.Serializable;
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.LinkedHashSet;
 +import java.util.List;
 +import java.util.Set;
 +
 +import org.apache.hadoop.hive.ql.io.AcidUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
+ import com.google.common.collect.Interner;
+ import com.google.common.collect.Interners;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.common.HiveStatsUtils;
  import org.apache.hadoop.hive.conf.HiveConf;
@@@ -228,17 -227,47 +239,19 @@@ public abstract class TaskCompiler 
          }
        }
  
 -      boolean oneLoadFile = true;
 +      boolean oneLoadFileForCtas = true;
        for (LoadFileDesc lfd : loadFileWork) {
          if (pCtx.getQueryProperties().isCTAS() || pCtx.getQueryProperties().isMaterializedView()) {
 -          assert (oneLoadFile); // should not have more than 1 load file for
 -          // CTAS
 -          // make the movetask's destination directory the table's destination.
 -          Path location;
 -          String loc = pCtx.getQueryProperties().isCTAS() ?
 -                  pCtx.getCreateTable().getLocation() : pCtx.getCreateViewDesc().getLocation();
 -          if (loc == null) {
 -            // get the default location
 -            Path targetPath;
 -            try {
 -              String protoName = null;
 -              if (pCtx.getQueryProperties().isCTAS()) {
 -                protoName = pCtx.getCreateTable().getTableName();
 -              } else if (pCtx.getQueryProperties().isMaterializedView()) {
 -                protoName = pCtx.getCreateViewDesc().getViewName();
 -              }
 -              String[] names = Utilities.getDbTableName(protoName);
 -              if (!db.databaseExists(names[0])) {
 -                throw new SemanticException("ERROR: The database " + names[0]
 -                    + " does not exist.");
 -              }
 -              Warehouse wh = new Warehouse(conf);
 -              targetPath = wh.getDefaultTablePath(db.getDatabase(names[0]), names[1]);
 -            } catch (HiveException | MetaException e) {
 -              throw new SemanticException(e);
 -            }
 -
 -            location = targetPath;
 -          } else {
 -            location = new Path(loc);
 +          if (!oneLoadFileForCtas) { // should not have more than 1 load file for CTAS.
 +            throw new SemanticException(
 +                "One query is not expected to contain multiple CTAS loads statements");
            }
 -          lfd.setTargetDir(location);
 -
 -          oneLoadFile = false;
 +          setLoadFileLocation(pCtx, lfd);
 +          oneLoadFileForCtas = false;
          }
-         mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false), conf));
+         mvTask.add(TaskFactory
+             .get(new MoveWork(null, null, null, lfd, false, SessionState.get().getLineageState()),
+                 conf));
        }
      }
  

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
index 45d4fb0,023d247..9477df6
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
@@@ -35,8 -35,8 +35,9 @@@ public class LoadDesc implements Serial
     * Need to remember whether this is an acid compliant operation, and if so whether it is an
     * insert, update, or delete.
     */
-   private final AcidUtils.Operation writeType;
+   final AcidUtils.Operation writeType;
+ 
 +
    public LoadDesc(final Path sourcePath, AcidUtils.Operation writeType) {
      this.sourcePath = sourcePath;
      this.writeType = writeType;

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
index 0032648,0292af5..d90acce
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
@@@ -51,8 -48,8 +51,8 @@@ public class LoadFileDesc extends LoadD
  
    public LoadFileDesc(final CreateTableDesc createTableDesc, final CreateViewDesc  createViewDesc,
                        final Path sourcePath, final Path targetDir, final boolean isDfsDir,
 -                      final String columns, final String columnTypes, AcidUtils.Operation writeType) {
 -    this(sourcePath, targetDir, isDfsDir, columns, columnTypes, writeType);
 +                      final String columns, final String columnTypes, AcidUtils.Operation writeType, boolean isMmCtas) {
-    this(sourcePath, targetDir, isDfsDir, columns, columnTypes, writeType, isMmCtas);
++    this(sourcePath, targetDir, isDfsDir, columns, columnTypes, writeType, isMmCtas);
      if (createTableDesc != null && createTableDesc.getDatabaseName() != null
          && createTableDesc.getTableName() != null) {
        destinationCreateTable = (createTableDesc.getTableName().contains(".") ? "" : createTableDesc

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index e893ab5,90a970c..85d1324
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@@ -18,13 -18,7 +18,9 @@@
  
  package org.apache.hadoop.hive.ql.plan;
  
- import java.io.Serializable;
- import java.util.LinkedHashMap;
- import java.util.Map;
- 
  import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 +import org.apache.hadoop.hive.ql.exec.Utilities;
  import org.apache.hadoop.hive.ql.io.AcidUtils;
  import org.apache.hadoop.hive.ql.plan.Explain.Level;
  
@@@ -39,8 -37,10 +39,8 @@@ public class LoadTableDesc extends Load
    private ListBucketingCtx lbCtx;
    private boolean inheritTableSpecs = true; //For partitions, flag controlling whether the current
                                              //table specs are to be used
-   private Long txnId;
 -  /*
 -  if the writeType above is NOT_ACID then the currentTransactionId will be null
 -   */
 -  private final Long currentTransactionId;
 +  private int stmtId;
++  private Long currentTransactionId;
  
    // TODO: the below seems like they should just be combined into partitionDesc
    private org.apache.hadoop.hive.ql.plan.TableDesc table;
@@@ -59,15 -59,13 +60,14 @@@
    }
  
    public LoadTableDesc(final Path sourcePath,
-       final org.apache.hadoop.hive.ql.plan.TableDesc table,
+       final TableDesc table,
        final Map<String, String> partitionSpec,
        final boolean replace,
-       final AcidUtils.Operation writeType,
-       Long txnId) {
+       final AcidUtils.Operation writeType, Long currentTransactionId) {
      super(sourcePath, writeType);
 -    this.currentTransactionId = currentTransactionId;
 -    init(table, partitionSpec, replace);
 +    Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to "
 +        + ((table.getProperties() == null) ? "null" : table.getTableName()));
-     init(table, partitionSpec, replace, txnId);
++    init(table, partitionSpec, replace, currentTransactionId);
    }
  
    /**
@@@ -105,17 -103,16 +105,17 @@@
    }
  
    public LoadTableDesc(final Path sourcePath,
-       final org.apache.hadoop.hive.ql.plan.TableDesc table,
+       final TableDesc table,
        final DynamicPartitionCtx dpCtx,
 -      final AcidUtils.Operation writeType, Long currentTransactionId) {
 +      final AcidUtils.Operation writeType,
 +      boolean isReplace, Long txnId) {
      super(sourcePath, writeType);
 +    Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + table.getTableName()/*, new Exception()*/);
      this.dpCtx = dpCtx;
 -    this.currentTransactionId = currentTransactionId;
      if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) {
 -      init(table, dpCtx.getPartSpec(), true);
 +      init(table, dpCtx.getPartSpec(), isReplace, txnId);
      } else {
-       init(table, new LinkedHashMap<String, String>(), isReplace, txnId);
 -      init(table, new LinkedHashMap<>(), true);
++      init(table, new LinkedHashMap<>(), isReplace, txnId);
      }
    }
  
@@@ -127,7 -123,6 +127,7 @@@
      this.table = table;
      this.partitionSpec = partitionSpec;
      this.replace = replace;
-     this.txnId = txnId;
++    this.currentTransactionId = txnId;
    }
  
    @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
@@@ -196,27 -182,7 +196,27 @@@
      this.lbCtx = lbCtx;
    }
  
-   public Long getTxnId() {
-     return txnId;
 -  public long getCurrentTransactionId() {
 -    return writeType == AcidUtils.Operation.NOT_ACID ? 0L : currentTransactionId;
++  public long getTxnId() {
++    return currentTransactionId == null ? 0 : currentTransactionId;
 +  }
 +
 +  public void setTxnId(Long txnId) {
-     this.txnId = txnId;
++    this.currentTransactionId = txnId;
 +  }
 +
 +  public int getStmtId() {
 +    return stmtId;
 +  }
 +
 +  public void setStmtId(int stmtId) {
 +    this.stmtId = stmtId;
 +  }
 +
 +  public void setIntermediateInMmWrite(boolean b) {
 +    this.commitMmWriteId = !b;
 +  }
 +
 +  public boolean isCommitMmWrite() {
 +    return commitMmWriteId;
    }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index 8594edf,00c0ce3..f6303ba
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@@ -56,22 -61,22 +62,26 @@@ public class MoveWork implements Serial
     * List of inserted partitions
     */
    protected List<Partition> movedParts;
 +  private boolean isNoop;
  
    public MoveWork() {
+     sessionStateLineageState = null;
    }
  
-   private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs) {
++
+   private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
+       LineageState lineageState) {
      this.inputs = inputs;
      this.outputs = outputs;
+     sessionStateLineageState = lineageState;
    }
  
    public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
        final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork,
-       boolean checkFileFormat, boolean srcLocal) {
-     this(inputs, outputs);
+       boolean checkFileFormat, boolean srcLocal, LineageState lineageState) {
+     this(inputs, outputs, lineageState);
 +    Utilities.LOG14535.info("Creating MoveWork " + System.identityHashCode(this)
 +        + " with " + loadTableWork + "; " + loadFileWork);
      this.loadTableWork = loadTableWork;
      this.loadFileWork = loadFileWork;
      this.checkFileFormat = checkFileFormat;
@@@ -80,8 -85,11 +90,8 @@@
  
    public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
        final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork,
-       boolean checkFileFormat) {
-     this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false);
+       boolean checkFileFormat, LineageState lineageState) {
 -    this(inputs, outputs, lineageState);
 -    this.loadTableWork = loadTableWork;
 -    this.loadFileWork = loadFileWork;
 -    this.checkFileFormat = checkFileFormat;
++    this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false, lineageState);
    }
  
    public MoveWork(final MoveWork o) {
@@@ -153,11 -162,7 +164,15 @@@
      this.srcLocal = srcLocal;
    }
  
 +  public void setNoop(boolean b) {
 +    this.isNoop = true;
 +  }
 +
 +  public boolean isNoop() {
 +    return this.isNoop;
 +  }
++  
+   public LineageState getLineagState() {
+     return sessionStateLineageState;
+   }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/test/results/clientpositive/tez/explainuser_3.q.out
----------------------------------------------------------------------
diff --cc ql/src/test/results/clientpositive/tez/explainuser_3.q.out
index 9f764b8,a331bf7..7e89478
--- a/ql/src/test/results/clientpositive/tez/explainuser_3.q.out
+++ b/ql/src/test/results/clientpositive/tez/explainuser_3.q.out
@@@ -509,13 -509,13 +509,13 @@@ Stage-
                    Conditional Operator
                      Stage-1
                        Map 1 vectorized
 -                      File Output Operator [FS_10]
 +                      File Output Operator [FS_8]
                          table:{"name:":"default.orc_merge5"}
-                         Select Operator [SEL_9] (rows=306 width=335)
+                         Select Operator [SEL_9] (rows=1 width=352)
                            Output:["_col0","_col1","_col2","_col3","_col4"]
-                           Filter Operator [FIL_8] (rows=306 width=335)
+                           Filter Operator [FIL_8] (rows=1 width=352)
                              predicate:(userid <= 13)
-                             TableScan [TS_0] (rows=919 width=335)
+                             TableScan [TS_0] (rows=1 width=352)
                                default@orc_merge5,orc_merge5,Tbl:COMPLETE,Col:NONE,Output:["userid","string1","subtype","decimal1","ts"]
              Stage-4(CONDITIONAL)
                File Merge


Mime
View raw message