hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [06/18] hive git commit: HIVE-14879 : integrate MM tables into ACID: replace MM metastore calls and structures with ACID ones (Wei Zheng)
Date Tue, 16 May 2017 22:53:11 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 777c119..03c50a7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -67,9 +67,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.zip.Deflater;
@@ -100,8 +97,8 @@ import org.apache.hadoop.hive.common.HiveInterruptUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.common.StringInternUtils;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -211,61 +208,9 @@ import org.apache.hive.common.util.ReflectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.beans.DefaultPersistenceDelegate;
-import java.beans.Encoder;
-import java.beans.Expression;
-import java.beans.Statement;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.net.URLDecoder;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.sql.SQLTransientException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.zip.Deflater;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.InflaterInputStream;
 
 
 /**
@@ -1592,7 +1537,7 @@ public final class Utilities {
     int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
         numBuckets = (conf != null && conf.getTable() != null)
           ? conf.getTable().getNumBuckets() : 0;
-    return removeTempOrDuplicateFiles(fs, fileStats, dpLevels, numBuckets, hconf, null);
+    return removeTempOrDuplicateFiles(fs, fileStats, dpLevels, numBuckets, hconf, null, 0);
   }
   
   private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException {
@@ -1608,7 +1553,7 @@ public final class Utilities {
   }
 
   public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats,
-      int dpLevels, int numBuckets, Configuration hconf, Long mmWriteId) throws IOException {
+      int dpLevels, int numBuckets, Configuration hconf, Long txnId, int stmtId) throws IOException {
     if (fileStats == null) {
       return null;
     }
@@ -1627,9 +1572,9 @@ public final class Utilities {
         }
         FileStatus[] items = fs.listStatus(path);
 
-        if (mmWriteId != null) {
+        if (txnId != null) {
           Path mmDir = parts[i].getPath();
-          if (!mmDir.getName().equals(ValidWriteIds.getMmFilePrefix(mmWriteId))) {
+          if (!mmDir.getName().equals(AcidUtils.deltaSubdir(txnId, txnId, stmtId))) {
             throw new IOException("Unexpected non-MM directory name " + mmDir);
           }
           Utilities.LOG14535.info("removeTempOrDuplicateFiles processing files in MM directory " + mmDir);
@@ -1644,14 +1589,14 @@ public final class Utilities {
       if (items.length == 0) {
         return result;
       }
-      if (mmWriteId == null) {
+      if (txnId == null) {
         taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs);
       } else {
         if (items.length > 1) {
           throw new IOException("Unexpected directories for non-DP MM: " + Arrays.toString(items));
         }
         Path mmDir = items[0].getPath();
-        if (!mmDir.getName().equals(ValidWriteIds.getMmFilePrefix(mmWriteId))) {
+        if (!mmDir.getName().equals(AcidUtils.deltaSubdir(txnId, txnId, stmtId))) {
           throw new IOException("Unexpected non-MM directory " + mmDir);
         }
         Utilities.LOG14535.info(
@@ -4003,10 +3948,10 @@ public final class Utilities {
   }
 
   public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels,
-      int lbLevels, PathFilter filter, long mmWriteId, Configuration conf) throws IOException {
+      int lbLevels, PathFilter filter, long txnId, int stmtId, Configuration conf) throws IOException {
     int skipLevels = dpLevels + lbLevels;
     if (filter == null) {
-      filter = new ValidWriteIds.IdPathFilter(mmWriteId, true);
+      filter = new JavaUtils.IdPathFilter(txnId, stmtId, true);
     }
     if (skipLevels == 0) {
       return statusToPath(fs.listStatus(path, filter));
@@ -4014,7 +3959,7 @@ public final class Utilities {
     if (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs)) {
       return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter);
     }
-    return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, mmWriteId);
+    return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, txnId, stmtId);
   }
 
   private static boolean isS3(FileSystem fs) {
@@ -4082,22 +4027,22 @@ public final class Utilities {
   }
 
   private static Path[] getMmDirectoryCandidatesGlobStatus(FileSystem fs,
-      Path path, int skipLevels, PathFilter filter, long mmWriteId) throws IOException {
+      Path path, int skipLevels, PathFilter filter, long txnId, int stmtId) throws IOException {
     StringBuilder sb = new StringBuilder(path.toUri().getPath());
     for (int i = 0; i < skipLevels; i++) {
       sb.append(Path.SEPARATOR).append("*");
     }
-    sb.append(Path.SEPARATOR).append(ValidWriteIds.getMmFilePrefix(mmWriteId));
+    sb.append(Path.SEPARATOR).append(AcidUtils.deltaSubdir(txnId, txnId, stmtId));
     Path pathPattern = new Path(path, sb.toString());
     Utilities.LOG14535.info("Looking for files via: " + pathPattern);
     return statusToPath(fs.globStatus(pathPattern, filter));
   }
 
   private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir,
-      int dpLevels, int lbLevels, String unionSuffix, ValidWriteIds.IdPathFilter filter,
-      long mmWriteId, Configuration conf) throws IOException {
+                                          int dpLevels, int lbLevels, JavaUtils.IdPathFilter filter,
+                                          long txnId, int stmtId, Configuration conf) throws IOException {
     Path[] files = getMmDirectoryCandidates(
-        fs, specPath, dpLevels, lbLevels, filter, mmWriteId, conf);
+        fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, conf);
     if (files != null) {
       for (Path path : files) {
         Utilities.LOG14535.info("Deleting " + path + " on failure");
@@ -4110,10 +4055,10 @@ public final class Utilities {
 
 
   public static void writeMmCommitManifest(List<Path> commitPaths, Path specPath, FileSystem fs,
-      String taskId, Long mmWriteId, String unionSuffix) throws HiveException {
+      String taskId, Long txnId, int stmtId, String unionSuffix) throws HiveException {
     if (commitPaths.isEmpty()) return;
     // We assume one FSOP per task (per specPath), so we create it in specPath.
-    Path manifestPath = getManifestDir(specPath, mmWriteId, unionSuffix);
+    Path manifestPath = getManifestDir(specPath, txnId, stmtId, unionSuffix);
     manifestPath = new Path(manifestPath, taskId + MANIFEST_EXTENSION);
     Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with " + commitPaths);
     try {
@@ -4132,8 +4077,8 @@ public final class Utilities {
     }
   }
 
-  private static Path getManifestDir(Path specPath, long mmWriteId, String unionSuffix) {
-    Path manifestPath = new Path(specPath, "_tmp." + ValidWriteIds.getMmFilePrefix(mmWriteId));
+  private static Path getManifestDir(Path specPath, long txnId, int stmtId, String unionSuffix) {
+    Path manifestPath = new Path(specPath, "_tmp." + AcidUtils.deltaSubdir(txnId, txnId, stmtId));
     return (unionSuffix == null) ? manifestPath : new Path(manifestPath, unionSuffix);
   }
 
@@ -4149,18 +4094,18 @@ public final class Utilities {
   }
 
   public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Configuration hconf,
-      boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long mmWriteId,
+      boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long txnId, int stmtId,
       Reporter reporter, boolean isMmCtas) throws IOException, HiveException {
     FileSystem fs = specPath.getFileSystem(hconf);
-    Path manifestDir = getManifestDir(specPath, mmWriteId, unionSuffix);
+    Path manifestDir = getManifestDir(specPath, txnId, stmtId, unionSuffix);
     if (!success) {
-      ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true);
+      JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true);
       tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels,
-          unionSuffix, filter, mmWriteId, hconf);
+          filter, txnId, stmtId, hconf);
       return;
     }
 
-    Utilities.LOG14535.info("Looking for manifests in: " + manifestDir + " (" + mmWriteId + ")");
+    Utilities.LOG14535.info("Looking for manifests in: " + manifestDir + " (" + txnId + ")");
     // TODO# may be wrong if there are no splits (empty insert/CTAS)
     List<Path> manifests = new ArrayList<>();
     if (fs.exists(manifestDir)) {
@@ -4180,14 +4125,14 @@ public final class Utilities {
     }
 
     Utilities.LOG14535.info("Looking for files in: " + specPath);
-    ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true);
+    JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true);
     if (isMmCtas && !fs.exists(specPath)) {
       // TODO: do we also need to do this when creating an empty partition from select?
       Utilities.LOG14535.info("Creating table directory for CTAS with no output at " + specPath);
       FileUtils.mkdir(fs, specPath, hconf);
     }
     Path[] files = getMmDirectoryCandidates(
-        fs, specPath, dpLevels, lbLevels, filter, mmWriteId, hconf);
+        fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, hconf);
     ArrayList<Path> mmDirectories = new ArrayList<>();
     if (files != null) {
       for (Path path : files) {
@@ -4243,7 +4188,7 @@ public final class Utilities {
       finalResults[i] = new PathOnlyFileStatus(mmDirectories.get(i));
     }
     List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
-        fs, finalResults, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, mmWriteId);
+        fs, finalResults, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, txnId, stmtId);
     // create empty buckets if necessary
     if (emptyBuckets.size() > 0) {
       assert mbc != null;
@@ -4294,7 +4239,7 @@ public final class Utilities {
    * if the entire directory is valid (has no uncommitted/temporary files).
    */
   public static List<Path> getValidMmDirectoriesFromTableOrPart(Path path, Configuration conf,
-      ValidWriteIds ids, int lbLevels) throws IOException {
+      ValidTxnList validTxnList, int lbLevels) throws IOException {
     Utilities.LOG14535.info("Looking for valid MM paths under " + path);
     // NULL means this directory is entirely valid.
     List<Path> result = null;
@@ -4304,8 +4249,8 @@ public final class Utilities {
     for (int i = 0; i < children.length; ++i) {
       FileStatus file = children[i];
       Path childPath = file.getPath();
-      Long writeId = ValidWriteIds.extractWriteId(childPath);
-      if (!file.isDirectory() || writeId == null || !ids.isValid(writeId)) {
+      Long txnId = JavaUtils.extractTxnId(childPath);
+      if (!file.isDirectory() || txnId == null || !validTxnList.isTxnValid(txnId)) {
         Utilities.LOG14535.info("Skipping path " + childPath);
         if (result == null) {
           result = new ArrayList<>(children.length - 1);

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 740488c..902caa3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -276,9 +276,8 @@ public class AcidUtils {
     return result;
   }
 
-  // INSERT_ONLY is a special operation which we only support INSERT operations, no UPDATE/DELETE
   public enum Operation {
-    NOT_ACID, INSERT, UPDATE, DELETE, INSERT_ONLY
+    NOT_ACID, INSERT, UPDATE, DELETE
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 9b83cb4..8bcf8c7 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -20,11 +20,9 @@ package org.apache.hadoop.hive.ql.io;
 
 import java.io.DataInput;
 import java.io.DataOutput;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -34,8 +32,11 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.Map.Entry;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StringInternUtils;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,8 +45,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
@@ -423,12 +422,11 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
    */
   private void addSplitsForGroup(List<Path> dirs, TableScanOperator tableScan, JobConf conf,
       InputFormat inputFormat, Class<? extends InputFormat> inputFormatClass, int splits,
-      TableDesc table, Map<String, ValidWriteIds> writeIdMap, List<InputSplit> result)
+      TableDesc table, List<InputSplit> result)
           throws IOException {
-    ValidWriteIds writeIds = extractWriteIds(writeIdMap, conf, table.getTableName());
-    if (writeIds != null) {
-      Utilities.LOG14535.info("Observing " + table.getTableName() + ": " + writeIds);
-    }
+    String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+    ValidTxnList validTxnList = txnString == null ? new ValidReadTxnList() :
+        new ValidReadTxnList(txnString);
 
     Utilities.copyTablePropertiesToConf(table, conf);
 
@@ -436,7 +434,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       pushFilters(conf, tableScan);
     }
 
-    Path[] finalDirs = processPathsForMmRead(dirs, conf, writeIds);
+    Path[] finalDirs = processPathsForMmRead(dirs, conf, validTxnList);
     if (finalDirs == null) {
       return; // No valid inputs.
     }
@@ -461,13 +459,13 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
   }
 
   public static Path[] processPathsForMmRead(List<Path> dirs, JobConf conf,
-      ValidWriteIds writeIds) throws IOException {
-    if (writeIds == null) {
+      ValidTxnList validTxnList) throws IOException {
+    if (validTxnList == null) {
       return dirs.toArray(new Path[dirs.size()]);
     } else {
       List<Path> finalPaths = new ArrayList<>(dirs.size());
       for (Path dir : dirs) {
-        processForWriteIds(dir, conf, writeIds, finalPaths);
+        processForWriteIds(dir, conf, validTxnList, finalPaths);
       }
       if (finalPaths.isEmpty()) {
         LOG.warn("No valid inputs found in " + dirs);
@@ -478,7 +476,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
   }
 
   private static void processForWriteIds(Path dir, JobConf conf,
-      ValidWriteIds writeIds, List<Path> finalPaths) throws IOException {
+      ValidTxnList validTxnList, List<Path> finalPaths) throws IOException {
     FileSystem fs = dir.getFileSystem(conf);
     Utilities.LOG14535.warn("Checking " + dir + " (root) for inputs");
     // Ignore nullscan-optimized paths.
@@ -489,17 +487,17 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     FileStatus[] files = fs.listStatus(dir); // TODO: batch?
     LinkedList<Path> subdirs = new LinkedList<>();
     for (FileStatus file : files) {
-      handleNonMmDirChild(file, writeIds, subdirs, finalPaths);
+      handleNonMmDirChild(file, validTxnList, subdirs, finalPaths);
     }
     while (!subdirs.isEmpty()) {
       Path subdir = subdirs.poll();
       for (FileStatus file : fs.listStatus(subdir)) {
-        handleNonMmDirChild(file, writeIds, subdirs, finalPaths);
+        handleNonMmDirChild(file, validTxnList, subdirs, finalPaths);
       }
     }
   }
 
-  private static void handleNonMmDirChild(FileStatus file, ValidWriteIds writeIds,
+  private static void handleNonMmDirChild(FileStatus file, ValidTxnList validTxnList,
       LinkedList<Path> subdirs, List<Path> finalPaths) {
     Path path = file.getPath();
     Utilities.LOG14535.warn("Checking " + path + " for inputs");
@@ -507,12 +505,12 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       Utilities.LOG14535.warn("Ignoring a file not in MM directory " + path);
       return;
     }
-    Long writeId = ValidWriteIds.extractWriteId(path);
-    if (writeId == null) {
+    Long txnId = JavaUtils.extractTxnId(path);
+    if (txnId == null) {
       subdirs.add(path);
       return;
     }
-    if (!writeIds.isValid(writeId)) {
+    if (!validTxnList.isTxnValid(txnId)) {
       Utilities.LOG14535.warn("Ignoring an uncommitted directory " + path);
       return;
     }
@@ -564,7 +562,6 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     StringBuilder readColumnNamesBuffer = new StringBuilder(newjob.
       get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, ""));
     // for each dir, get the InputFormat, and do getSplits.
-    Map<String, ValidWriteIds> writeIdMap = new HashMap<>();
     for (Path dir : dirs) {
       PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
       Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
@@ -615,7 +612,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
         addSplitsForGroup(currentDirs, currentTableScan, newjob,
             getInputFormatFromCache(currentInputFormatClass, job),
             currentInputFormatClass, currentDirs.size()*(numSplits / dirs.length),
-            currentTable, writeIdMap, result);
+            currentTable, result);
       }
 
       currentDirs.clear();
@@ -637,7 +634,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       addSplitsForGroup(currentDirs, currentTableScan, newjob,
           getInputFormatFromCache(currentInputFormatClass, job),
           currentInputFormatClass, currentDirs.size()*(numSplits / dirs.length),
-          currentTable, writeIdMap, result);
+          currentTable, result);
     }
 
     Utilities.clearWorkMapForConf(job);
@@ -648,19 +645,6 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     return result.toArray(new HiveInputSplit[result.size()]);
   }
 
-  public static ValidWriteIds extractWriteIds(Map<String, ValidWriteIds> writeIdMap,
-      JobConf newjob, String tableName) {
-    if (StringUtils.isBlank(tableName)) return null;
-    ValidWriteIds writeIds = writeIdMap.get(tableName);
-    if (writeIds == null) {
-      writeIds = ValidWriteIds.createFromConf(newjob, tableName);
-      writeIdMap.put(tableName, writeIds != null ? writeIds : ValidWriteIds.NO_WRITE_IDS);
-    } else if (writeIds == ValidWriteIds.NO_WRITE_IDS) {
-      writeIds = null;
-    }
-    return writeIds;
-  }
-
   private void pushProjection(final JobConf newjob, final StringBuilder readColumnsBuffer,
       final StringBuilder readColumnNamesBuffer) {
     String readColIds = readColumnsBuffer.toString();

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 6498199..d793ccf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -32,7 +32,6 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -52,7 +51,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
@@ -70,9 +68,9 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.BlobStorageUtils;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.hive.common.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -104,7 +102,6 @@ import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest;
 import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResult;
 import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
 import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
 import org.apache.hadoop.hive.metastore.api.HiveObjectType;
@@ -157,7 +154,6 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -1624,26 +1620,13 @@ public class Hive {
   public void loadPartition(Path loadPath, String tableName,
       Map<String, String> partSpec, boolean replace, boolean inheritTableSpecs,
       boolean isSkewedStoreAsSubdir,  boolean isSrcLocal, boolean isAcid,
-      boolean hasFollowingStatsTask, Long mmWriteId, boolean isCommitMmWrite)
+      boolean hasFollowingStatsTask, Long txnId, int stmtId)
           throws HiveException {
     Table tbl = getTable(tableName);
-    boolean isMmTableWrite = (mmWriteId != null);
+    boolean isMmTableWrite = (txnId != null);
     Preconditions.checkState(isMmTableWrite == MetaStoreUtils.isInsertOnlyTable(tbl.getParameters()));
     loadPartition(loadPath, tbl, partSpec, replace, inheritTableSpecs,
-        isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, mmWriteId);
-    if (isMmTableWrite && isCommitMmWrite) {
-      // The assumption behind committing here is that this partition is the only one outputted.
-      commitMmTableWrite(tbl, mmWriteId);
-    }
-  }
-
-  public void commitMmTableWrite(Table tbl, Long mmWriteId)
-      throws HiveException {
-    try {
-      getMSC().finalizeTableWrite(tbl.getDbName(), tbl.getTableName(), mmWriteId, true);
-    } catch (TException e) {
-      throw new HiveException(e);
-    }
+        isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, txnId, stmtId);
   }
 
   /**
@@ -1673,7 +1656,7 @@ public class Hive {
    */
   public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec,
       boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
-      boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, Long mmWriteId)
+      boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask, Long txnId, int stmtId)
           throws HiveException {
     Path tblDataLocationPath =  tbl.getDataLocation();
     try {
@@ -1722,34 +1705,34 @@ public class Hive {
         newFiles = Collections.synchronizedList(new ArrayList<Path>());
       }
       // TODO: this assumes both paths are qualified; which they are, currently.
-      if (mmWriteId != null && loadPath.equals(newPartPath)) {
+      if (txnId != null && loadPath.equals(newPartPath)) {
         // MM insert query, move itself is a no-op.
         Utilities.LOG14535.info("not moving " + loadPath + " to " + newPartPath + " (MM)");
         assert !isAcid;
         if (areEventsForDmlNeeded(tbl, oldPart)) {
-          newFiles = listFilesCreatedByQuery(loadPath, mmWriteId);
+          newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId);
         }
         Utilities.LOG14535.info("maybe deleting stuff from " + oldPartPath + " (new " + newPartPath + ") for replace");
         if (replace && oldPartPath != null) {
           boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
           deleteOldPathForReplace(newPartPath, oldPartPath, getConf(), isAutoPurge,
-              new ValidWriteIds.IdPathFilter(mmWriteId, false, true), mmWriteId != null,
+              new JavaUtils.IdPathFilter(txnId, stmtId, false, true), true,
               tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0);
         }
       } else {
         // Either a non-MM query, or a load into MM table from an external source.
         PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER;
         Path destPath = newPartPath;
-        if (mmWriteId != null) {
+        if (txnId != null) {
           // We will load into MM directory, and delete from the parent if needed.
-          destPath = new Path(destPath, ValidWriteIds.getMmFilePrefix(mmWriteId));
-          filter = replace ? new ValidWriteIds.IdPathFilter(mmWriteId, false, true) : filter;
+          destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
+          filter = replace ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter;
         }
         Utilities.LOG14535.info("moving " + loadPath + " to " + destPath);
         if (replace || (oldPart == null && !isAcid)) {
           boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
           replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(),
-              isSrcLocal, isAutoPurge, newFiles, filter, mmWriteId != null);
+              isSrcLocal, isAutoPurge, newFiles, filter, txnId != null);
         } else {
           FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
           Hive.copyFiles(conf, loadPath, destPath, fs, isSrcLocal, isAcid, newFiles);
@@ -1834,9 +1817,9 @@ public class Hive {
     return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null;
   }
 
-  private List<Path> listFilesCreatedByQuery(Path loadPath, long mmWriteId) throws HiveException {
+  private List<Path> listFilesCreatedByQuery(Path loadPath, long txnId, int stmtId) throws HiveException {
     List<Path> newFiles = new ArrayList<Path>();
-    final String filePrefix = ValidWriteIds.getMmFilePrefix(mmWriteId);
+    final String filePrefix = AcidUtils.deltaSubdir(txnId, txnId, stmtId);
     FileStatus[] srcs;
     FileSystem srcFs;
     try {
@@ -1999,11 +1982,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @throws HiveException
    */
   private Set<Path> getValidPartitionsInPath(
-      int numDP, int numLB, Path loadPath, Long mmWriteId) throws HiveException {
+      int numDP, int numLB, Path loadPath, Long txnId, int stmtId) throws HiveException {
     Set<Path> validPartitions = new HashSet<Path>();
     try {
       FileSystem fs = loadPath.getFileSystem(conf);
-      if (mmWriteId == null) {
+      if (txnId == null) {
         FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs);
         // Check for empty partitions
         for (FileStatus s : leafStatus) {
@@ -2018,7 +2001,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
         // The non-MM path only finds new partitions, as it is looking at the temp path.
         // To produce the same effect, we will find all the partitions affected by this write ID.
         Path[] leafStatus = Utilities.getMmDirectoryCandidates(
-            fs, loadPath, numDP, numLB, null, mmWriteId, conf);
+            fs, loadPath, numDP, numLB, null, txnId, stmtId, conf);
         for (Path p : leafStatus) {
           Path dpPath = p.getParent(); // Skip the MM directory that we have found.
           for (int i = 0; i < numLB; ++i) {
@@ -2064,8 +2047,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
    */
   public Map<Map<String, String>, Partition> loadDynamicPartitions(final Path loadPath,
       final String tableName, final Map<String, String> partSpec, final boolean replace,
-      final int numDP, final int numLB, final boolean isAcid, final long txnId,
-      final boolean hasFollowingStatsTask, final AcidUtils.Operation operation, final Long mmWriteId)
+      final int numDP, final int numLB, final boolean isAcid, final long txnId, final int stmtId,
+      final boolean hasFollowingStatsTask, final AcidUtils.Operation operation)
       throws HiveException {
 
     final Map<Map<String, String>, Partition> partitionsMap =
@@ -2080,7 +2063,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
     // Get all valid partition paths and existing partitions for them (if any)
     final Table tbl = getTable(tableName);
-    final Set<Path> validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, mmWriteId);
+    final Set<Path> validPartitions = getValidPartitionsInPath(numDP, numLB, loadPath, txnId, stmtId);
 
     final int partsToLoad = validPartitions.size();
     final AtomicInteger partitionsLoaded = new AtomicInteger(0);
@@ -2114,7 +2097,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
               Utilities.LOG14535.info("loadPartition called for DPP from " + partPath + " to " + tbl.getTableName());
               Partition newPartition = loadPartition(partPath, tbl, fullPartSpec,
                   replace, true, numLB > 0,
-                  false, isAcid, hasFollowingStatsTask, mmWriteId);
+                  false, isAcid, hasFollowingStatsTask, txnId, stmtId);
               partitionsMap.put(fullPartSpec, newPartition);
 
               if (inPlaceEligible) {
@@ -2146,10 +2129,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
       for (Future future : futures) {
         future.get();
       }
-      if (mmWriteId != null) {
-        // Commit after we have processed all the partitions.
-        commitMmTableWrite(tbl, mmWriteId);
-      }
     } catch (InterruptedException | ExecutionException e) {
       LOG.debug("Cancelling " + futures.size() + " dynamic loading tasks");
       //cancel other futures
@@ -2200,8 +2179,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
    */
   public void loadTable(Path loadPath, String tableName, boolean replace, boolean isSrcLocal,
       boolean isSkewedStoreAsSubdir, boolean isAcid, boolean hasFollowingStatsTask,
-      Long mmWriteId) throws HiveException {
-
+      Long txnId, int stmtId) throws HiveException {
     List<Path> newFiles = null;
     Table tbl = getTable(tableName);
     HiveConf sessionConf = SessionState.getSessionConf();
@@ -2209,30 +2187,30 @@ private void constructOneLBLocationMap(FileStatus fSta,
       newFiles = Collections.synchronizedList(new ArrayList<Path>());
     }
     // TODO: this assumes both paths are qualified; which they are, currently.
-    if (mmWriteId != null && loadPath.equals(tbl.getPath())) {
+    if (txnId != null && loadPath.equals(tbl.getPath())) {
       Utilities.LOG14535.info("not moving " + loadPath + " to " + tbl.getPath());
       if (replace) {
         Path tableDest = tbl.getPath();
         boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
         deleteOldPathForReplace(tableDest, tableDest, sessionConf, isAutopurge,
-            new ValidWriteIds.IdPathFilter(mmWriteId, false, true), mmWriteId != null,
+            new JavaUtils.IdPathFilter(txnId, stmtId, false, true), true,
             tbl.isStoredAsSubDirectories() ? tbl.getSkewedColNames().size() : 0);
       }
-      newFiles = listFilesCreatedByQuery(loadPath, mmWriteId);
+      newFiles = listFilesCreatedByQuery(loadPath, txnId, stmtId);
     } else {
       // Either a non-MM query, or a load into MM table from an external source.
       Path tblPath = tbl.getPath(), destPath = tblPath;
       PathFilter filter = FileUtils.HIDDEN_FILES_PATH_FILTER;
-      if (mmWriteId != null) {
+      if (txnId != null) {
         // We will load into MM directory, and delete from the parent if needed.
-        destPath = new Path(destPath, ValidWriteIds.getMmFilePrefix(mmWriteId));
-        filter = replace ? new ValidWriteIds.IdPathFilter(mmWriteId, false, true) : filter;
+        destPath = new Path(destPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
+        filter = replace ? new JavaUtils.IdPathFilter(txnId, stmtId, false, true) : filter;
       }
       Utilities.LOG14535.info("moving " + loadPath + " to " + tblPath + " (replace = " + replace + ")");
       if (replace) {
         boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
         replaceFiles(tblPath, loadPath, destPath, tblPath,
-            sessionConf, isSrcLocal, isAutopurge, newFiles, filter, mmWriteId != null);
+            sessionConf, isSrcLocal, isAutopurge, newFiles, filter, txnId != null);
       } else {
         try {
           FileSystem fs = tbl.getDataLocation().getFileSystem(sessionConf);
@@ -2274,10 +2252,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
       throw new HiveException(e);
     }
 
-    if (mmWriteId != null) {
-      commitMmTableWrite(tbl, mmWriteId);
-    }
-
     fireInsertEvent(tbl, null, replace, newFiles);
   }
 
@@ -4337,25 +4311,4 @@ private void constructOneLBLocationMap(FileStatus fSta,
       throw new HiveException(e);
     }
   }
-
-  public long getNextTableWriteId(String dbName, String tableName) throws HiveException {
-    try {
-      return getMSC().getNextTableWriteId(dbName, tableName);
-    } catch (Exception e) {
-      throw new HiveException(e);
-    }
-  }
-
-  public ValidWriteIds getValidWriteIdsForTable(
-      String dbName, String tableName) throws HiveException {
-    try {
-      // TODO: decode ID ranges here if we use that optimization
-      GetValidWriteIdsResult result = getMSC().getValidWriteIds(dbName, tableName);
-      return new ValidWriteIds(result.getLowWatermarkId(), result.getHighWatermarkId(),
-          result.isSetAreIdsValid() && result.isAreIdsValid(),
-          result.isSetIds() ? new HashSet<Long>(result.getIds()) : null);
-    } catch (Exception e) {
-      throw new HiveException(e);
-    }
-  }
-};
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index 5efaf70..6282548 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -1026,8 +1026,4 @@ public class Table implements Serializable {
   public boolean hasDeserializer() {
     return deserializer != null;
   }
-
-  public void setMmNextWriteId(long writeId) {
-    this.tTable.setMmNextWriteId(writeId);
-  }
 };

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 87fff3e..204e67d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1636,7 +1636,9 @@ public final class GenMapRedUtils {
     } else {
       fmd = new OrcFileMergeDesc();
     }
-    fmd.setMmWriteId(fsInputDesc.getMmWriteId());
+    fmd.setTxnId(fsInputDesc.getMmWriteId());
+    int stmtId = fsInputDesc.getStatementId();
+    fmd.setStmtId(stmtId == -1 ? 0 : stmtId);
     fmd.setDpCtx(fsInputDesc.getDynPartCtx());
     fmd.setOutputPath(finalName);
     fmd.setHasDynamicPartitions(work.hasDynamicPartitions());

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
index 64db005..b50f664 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
@@ -86,7 +86,7 @@ public class SkewJoinResolver implements PhysicalPlanResolver {
       ParseContext pc = physicalContext.getParseContext();
       if (pc.getLoadTableWork() != null) {
         for (LoadTableDesc ltd : pc.getLoadTableWork()) {
-          if (ltd.getMmWriteId() == null) continue;
+          if (ltd.getTxnId() == null) continue;
           // See the path in FSOP that calls fs.exists on finalPath.
           LOG.debug("Not using skew join because the destination table "
               + ltd.getTable().getTableName() + " is an insert_only table");

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index 41245c8..b9db582 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -116,7 +116,8 @@ public abstract class BaseSemanticAnalyzer {
    */
   protected Set<FileSinkDesc> acidFileSinks = new HashSet<FileSinkDesc>();
 
-  // whether any ACID table is involved in a query
+  // whether any ACID table or Insert-only (mm) table is involved in a query
+  // They both require DbTxnManager and both need to recordValidTxns when acquiring locks in Driver
   protected boolean acidInQuery;
 
   public static final int HIVE_COLUMN_ORDER_ASC = 1;

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index deb51be..e534272 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -18,15 +18,8 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-import org.apache.hadoop.hive.common.ValidWriteIds;
-
-import java.util.List;
-
-import org.apache.hadoop.hive.ql.exec.Utilities;
 
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -41,13 +34,17 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -214,8 +211,6 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
       int lbLevels = isMmTable && ts.tableHandle.isStoredAsSubDirectories()
           ? ts.tableHandle.getSkewedColNames().size() : 0;
-      ValidWriteIds ids = isMmTable ? db.getValidWriteIdsForTable(
-          ts.tableHandle.getDbName(), ts.tableHandle.getTableName()) : null;
       if (ts.tableHandle.isPartitioned()) {
         for (Partition partition : partitions) {
           Path fromPath = partition.getDataLocation();
@@ -229,7 +224,7 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
             }
             copyTask = ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toPartPath, conf);
           } else {
-            CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toPartPath, conf);
+            CopyWork cw = createCopyWork(isMmTable, lbLevels, new ValidReadTxnList(), fromPath, toPartPath, conf);
             copyTask = TaskFactory.get(cw, conf);
           }
           rootTasks.add(copyTask);
@@ -248,7 +243,7 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
           copyTask = ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toDataPath, conf);
         } else {
           // TODO# master merge - did master remove this path or did it never exit? we need it for MM
-          CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toDataPath, conf);
+          CopyWork cw = createCopyWork(isMmTable, lbLevels, new ValidReadTxnList(), fromPath, toDataPath, conf);
           copyTask = TaskFactory.get(cw, conf);
         }
         rootTasks.add(copyTask);
@@ -260,14 +255,14 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
   }
 
-  private static CopyWork createCopyWork(boolean isMmTable, int lbLevels, ValidWriteIds ids,
+  private static CopyWork createCopyWork(boolean isMmTable, int lbLevels, ValidTxnList validTxnList,
       Path fromPath, Path toDataPath, Configuration conf) throws IOException {
     List<Path> validPaths = null;
     if (isMmTable) {
       fromPath = fromPath.getFileSystem(conf).makeQualified(fromPath);
-      validPaths = Utilities.getValidMmDirectoriesFromTableOrPart(fromPath, conf, ids, lbLevels);
+      validPaths = Utilities.getValidMmDirectoriesFromTableOrPart(fromPath, conf, validTxnList, lbLevels);
     }
-    if (validPaths == null) {
+    if (validPaths == null || validPaths.isEmpty()) {
       return new CopyWork(fromPath, toDataPath, false); // Not MM, or no need to skip anything.
     } else {
       return createCopyWorkForValidPaths(fromPath, toDataPath, validPaths);

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 99a7392..a220d1a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -33,13 +33,11 @@ import java.util.TreeMap;
 
 import org.antlr.runtime.tree.Tree;
 import org.apache.commons.lang.ObjectUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -57,6 +55,7 @@ import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -303,31 +302,31 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       tableExists = true;
     }
 
-    Long mmWriteId = null;
+    Long txnId = null;
+    int stmtId = 0;
     if (table != null && MetaStoreUtils.isInsertOnlyTable(table.getParameters())) {
-      mmWriteId = x.getHive().getNextTableWriteId(table.getDbName(), table.getTableName());
+      txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
     } else if (table == null && isSourceMm) {
       // We could import everything as is - directories and IDs, but that won't work with ACID
       // txn ids in future. So, let's import everything into the new MM directory with ID == 0.
-      mmWriteId = 0l;
+      txnId = 0l;
     }
-    //todo due to master merge on May 4, tblDesc has been changed from CreateTableDesc to ImportTableDesc
-    // which may result in Import test failure
+    //todo due to the master merge, tblDesc is no longer CreateTableDesc, but ImportTableDesc
     /*
-    if (mmWriteId != null) {
-      tblDesc.setInitialMmWriteId(mmWriteId);
+    if (txnId != null) {
+      tblDesc.setInitialMmWriteId(txnId);
     }
     */
     if (!replicationSpec.isInReplicationScope()) {
       createRegularImportTasks(
           tblDesc, partitionDescs,
           isPartSpecSet, replicationSpec, table,
-          fromURI, fs, wh, x, mmWriteId, isSourceMm);
+          fromURI, fs, wh, x, txnId, stmtId, isSourceMm);
     } else {
       createReplImportTasks(
           tblDesc, partitionDescs,
           isPartSpecSet, replicationSpec, waitOnPrecursor, table,
-          fromURI, fs, wh, x, mmWriteId, isSourceMm);
+          fromURI, fs, wh, x, txnId, stmtId, isSourceMm);
     }
     return tableExists;
   }
@@ -362,17 +361,17 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
       ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x,
-      Long mmWriteId, boolean isSourceMm) {
+      Long txnId, int stmtId, boolean isSourceMm) {
     Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
-    Path destPath = mmWriteId == null ? x.getCtx().getExternalTmpPath(tgtPath)
-        : new Path(tgtPath, ValidWriteIds.getMmFilePrefix(mmWriteId));
+    Path destPath = txnId == null ? x.getCtx().getExternalTmpPath(tgtPath)
+        : new Path(tgtPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
     Utilities.LOG14535.info("adding import work for table with source location: "
         + dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm "
-        + mmWriteId + " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName()));
+        + txnId + " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName()));
 
     Task<?> copyTask = null;
     if (replicationSpec.isInReplicationScope()) {
-      if (isSourceMm || mmWriteId != null) {
+      if (isSourceMm || txnId != null) {
         // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed.
         throw new RuntimeException(
             "Not supported right now because Replication is completely screwed");
@@ -385,7 +384,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
 
     LoadTableDesc loadTableWork = new LoadTableDesc(destPath,
-        Utilities.getTableDesc(table), new TreeMap<String, String>(), replace, mmWriteId);
+        Utilities.getTableDesc(table), new TreeMap<String, String>(), replace, txnId);
+    loadTableWork.setTxnId(txnId);
+    loadTableWork.setStmtId(stmtId);
     MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false);
     Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
     copyTask.addDependentTask(loadTableTask);
@@ -433,7 +434,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
  private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc,
       Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec,
-      EximUtil.SemanticAnalyzerWrapperContext x, Long mmWriteId, boolean isSourceMm,
+      EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, boolean isSourceMm,
       Task<?> commitTask)
       throws MetaException, IOException, HiveException {
     AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
@@ -452,17 +453,17 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           + partSpecToString(partSpec.getPartSpec())
           + " with source location: " + srcLocation);
       Path tgtLocation = new Path(partSpec.getLocation());
-      Path destPath = mmWriteId == null ? x.getCtx().getExternalTmpPath(tgtLocation)
-          : new Path(tgtLocation, ValidWriteIds.getMmFilePrefix(mmWriteId));
-      Path moveTaskSrc =  mmWriteId == null ? destPath : tgtLocation;
+      Path destPath = txnId == null ? x.getCtx().getExternalTmpPath(tgtLocation)
+          : new Path(tgtLocation, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
+      Path moveTaskSrc =  txnId == null ? destPath : tgtLocation;
       Utilities.LOG14535.info("adding import work for partition with source location: "
           + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm "
-          + mmWriteId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec()));
+          + txnId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec()));
 
 
       Task<?> copyTask = null;
       if (replicationSpec.isInReplicationScope()) {
-        if (isSourceMm || mmWriteId != null) {
+        if (isSourceMm || txnId != null) {
           // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed.
           throw new RuntimeException(
               "Not supported right now because Replication is completely screwed");
@@ -478,11 +479,13 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       Task<?> addPartTask = TaskFactory.get(new DDLWork(x.getInputs(),
           x.getOutputs(), addPartitionDesc), x.getConf());
       LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table),
-          partSpec.getPartSpec(), replicationSpec.isReplace(), mmWriteId);
+          partSpec.getPartSpec(), replicationSpec.isReplace(), txnId);
+      loadTableWork.setTxnId(txnId);
+      loadTableWork.setStmtId(stmtId);
       loadTableWork.setInheritTableSpecs(false);
       // Do not commit the write ID from each task; need to commit once.
       // TODO: we should just change the import to use a single MoveTask, like dynparts.
-      loadTableWork.setIntermediateInMmWrite(mmWriteId != null);
+      loadTableWork.setIntermediateInMmWrite(txnId != null);
       Task<?> loadPartTask = TaskFactory.get(new MoveWork(
           x.getInputs(), x.getOutputs(), loadTableWork, null, false), x.getConf());
       copyTask.addDependentTask(loadPartTask);
@@ -778,21 +781,21 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
   private static void createRegularImportTasks(
       ImportTableDesc tblDesc, List<AddPartitionDesc> partitionDescs, boolean isPartSpecSet,
       ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh,
-      EximUtil.SemanticAnalyzerWrapperContext x, Long mmWriteId, boolean isSourceMm)
+      EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, boolean isSourceMm)
       throws HiveException, URISyntaxException, IOException, MetaException {
 
     if (table != null) {
       if (table.isPartitioned()) {
         x.getLOG().debug("table partitioned");
         Task<?> ict = createImportCommitTask(
-            table.getDbName(), table.getTableName(), mmWriteId, x.getConf());
+            table.getDbName(), table.getTableName(), txnId, stmtId, x.getConf());
 
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
           Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
           org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
           if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
             x.getTasks().add(addSinglePartition(
-                fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict));
+                fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, txnId, stmtId, isSourceMm, ict));
           } else {
             throw new SemanticException(
                 ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec)));
@@ -804,7 +807,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         Path tgtPath = new Path(table.getDataLocation().toString());
         FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
         checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x);
-        loadTable(fromURI, table, false, tgtPath, replicationSpec, x, mmWriteId, isSourceMm);
+        loadTable(fromURI, table, false, tgtPath, replicationSpec, x, txnId, stmtId, isSourceMm);
       }
       // Set this to read because we can't overwrite any existing partitions
       x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK));
@@ -821,10 +824,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
       if (isPartitioned(tblDesc)) {
         Task<?> ict = createImportCommitTask(
-            tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId, x.getConf());
+            tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, stmtId, x.getConf());
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
           t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc,
-            replicationSpec, x, mmWriteId, isSourceMm, ict));
+            replicationSpec, x, txnId, stmtId, isSourceMm, ict));
         }
       } else {
         x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
@@ -841,7 +844,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           }
           FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf());
           checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x);
-          t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, mmWriteId, isSourceMm));
+          t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, txnId, stmtId, isSourceMm));
         }
       }
       x.getTasks().add(t);
@@ -849,10 +852,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
   private static Task<?> createImportCommitTask(
-      String dbName, String tblName, Long mmWriteId, HiveConf conf) {
+      String dbName, String tblName, Long txnId, int stmtId, HiveConf conf) {
     @SuppressWarnings("unchecked")
-    Task<ImportCommitWork> ict = (mmWriteId == null) ? null : TaskFactory.get(
-        new ImportCommitWork(dbName, tblName, mmWriteId), conf);
+    Task<ImportCommitWork> ict = (txnId == null) ? null : TaskFactory.get(
+        new ImportCommitWork(dbName, tblName, txnId, stmtId), conf);
     return ict;
   }
 
@@ -864,7 +867,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       List<AddPartitionDesc> partitionDescs,
       boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnPrecursor,
       Table table, URI fromURI, FileSystem fs, Warehouse wh,
-      EximUtil.SemanticAnalyzerWrapperContext x, Long mmWriteId, boolean isSourceMm)
+      EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, boolean isSourceMm)
       throws HiveException, URISyntaxException, IOException, MetaException {
 
     Task<?> dr = null;
@@ -933,15 +936,15 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       if (!replicationSpec.isMetadataOnly()) {
         if (isPartitioned(tblDesc)) {
           Task<?> ict = createImportCommitTask(
-              tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId, x.getConf());
+              tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, stmtId, x.getConf());
           for (AddPartitionDesc addPartitionDesc : partitionDescs) {
             addPartitionDesc.setReplicationSpec(replicationSpec);
             t.addDependentTask(
-                addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict));
+                addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, txnId, stmtId, isSourceMm, ict));
           }
         } else {
           x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
-          t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, mmWriteId, isSourceMm));
+          t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, txnId, stmtId, isSourceMm));
         }
       }
       if (dr == null){
@@ -961,11 +964,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
           org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
           Task<?> ict = replicationSpec.isMetadataOnly() ? null : createImportCommitTask(
-              tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId, x.getConf());
+              tblDesc.getDatabaseName(), tblDesc.getTableName(), txnId, stmtId, x.getConf());
           if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
             if (!replicationSpec.isMetadataOnly()){
               x.getTasks().add(addSinglePartition(
-                  fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict));
+                  fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, txnId, stmtId, isSourceMm, ict));
             }
           } else {
             // If replicating, then the partition already existing means we need to replace, maybe, if
@@ -973,7 +976,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
             if (replicationSpec.allowReplacementInto(ptn)){
               if (!replicationSpec.isMetadataOnly()){
                 x.getTasks().add(addSinglePartition(
-                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict));
+                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, txnId, stmtId, isSourceMm, ict));
               } else {
                 x.getTasks().add(alterSinglePartition(
                     fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
@@ -1002,7 +1005,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         if (!replicationSpec.isMetadataOnly()) {
           // repl-imports are replace-into unless the event is insert-into
           loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI),
-            replicationSpec, x, mmWriteId, isSourceMm);
+            replicationSpec, x, txnId, stmtId, isSourceMm);
         } else {
           x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
index d3b4da1..f31775e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IndexUpdater.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.parse;
 
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.ql.Driver;
@@ -44,7 +43,6 @@ import java.util.Set;
 public class IndexUpdater {
   private List<LoadTableDesc> loadTableWork;
   private HiveConf conf;
-  private Configuration parentConf;
   // Assumes one instance of this + single-threaded compilation for each query.
   private Hive hive;
   private List<Task<? extends Serializable>> tasks;
@@ -54,7 +52,6 @@ public class IndexUpdater {
   public IndexUpdater(List<LoadTableDesc> loadTableWork, Set<ReadEntity> inputs, Configuration conf) {
     this.loadTableWork = loadTableWork;
     this.inputs = inputs;
-    this.parentConf = conf;
     this.conf = new HiveConf(conf, IndexUpdater.class);
     this.tasks = new LinkedList<Task<? extends Serializable>>();
   }
@@ -63,7 +60,6 @@ public class IndexUpdater {
       Configuration conf) {
     this.loadTableWork = new LinkedList<LoadTableDesc>();
     this.loadTableWork.add(loadTableWork);
-    this.parentConf = conf;
     this.conf = new HiveConf(conf, IndexUpdater.class);
     this.tasks = new LinkedList<Task<? extends Serializable>>();
     this.inputs = inputs;
@@ -79,15 +75,15 @@ public class IndexUpdater {
       Map<String, String> partSpec = ltd.getPartitionSpec();
       if (partSpec == null || partSpec.size() == 0) {
         //unpartitioned table, update whole index
-        doIndexUpdate(tblIndexes, ltd.getMmWriteId());
+        doIndexUpdate(tblIndexes);
       } else {
-        doIndexUpdate(tblIndexes, partSpec, ltd.getMmWriteId());
+        doIndexUpdate(tblIndexes, partSpec);
       }
     }
     return tasks;
   }
 
-  private void doIndexUpdate(List<Index> tblIndexes, Long mmWriteId) throws HiveException {
+  private void doIndexUpdate(List<Index> tblIndexes) throws HiveException {
     for (Index idx : tblIndexes) {
       StringBuilder sb = new StringBuilder();
       sb.append("ALTER INDEX ");
@@ -96,21 +92,20 @@ public class IndexUpdater {
       sb.append(idx.getDbName()).append('.');
       sb.append(idx.getOrigTableName());
       sb.append(" REBUILD");
-      compileRebuild(sb.toString(), idx, mmWriteId);
+      compileRebuild(sb.toString());
     }
   }
 
   private void doIndexUpdate(List<Index> tblIndexes, Map<String, String>
-      partSpec, Long mmWriteId) throws HiveException {
+      partSpec) throws HiveException {
     for (Index index : tblIndexes) {
       if (containsPartition(index, partSpec)) {
-        doIndexUpdate(index, partSpec, mmWriteId);
+        doIndexUpdate(index, partSpec);
       }
     }
   }
 
-  private void doIndexUpdate(Index index, Map<String, String> partSpec, Long mmWriteId)
-      throws HiveException {
+  private void doIndexUpdate(Index index, Map<String, String> partSpec) {
     StringBuilder ps = new StringBuilder();
     boolean first = true;
     ps.append("(");
@@ -134,18 +129,12 @@ public class IndexUpdater {
     sb.append(" PARTITION ");
     sb.append(ps.toString());
     sb.append(" REBUILD");
-    compileRebuild(sb.toString(), index, mmWriteId);
+    compileRebuild(sb.toString());
   }
 
-  private void compileRebuild(String query, Index index, Long mmWriteId)
-      throws HiveException {
+  private void compileRebuild(String query) {
     Driver driver = new Driver(this.conf);
     driver.compile(query, false);
-    if (mmWriteId != null) {
-      // TODO: this is rather fragile
-      ValidWriteIds.addCurrentToConf(
-          parentConf, index.getDbName(), index.getOrigTableName(), mmWriteId);
-    }
     tasks.addAll(driver.getPlan().getRootTasks());
     inputs.addAll(driver.getPlan().getInputs());
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index 04e8cac..5ef77f5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hive.ql.parse;
 
 import org.apache.hadoop.hive.conf.HiveConf.StrictChecks;
 
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
@@ -271,19 +269,18 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
       }
     }
 
-    Long mmWriteId = null;
+    Long txnId = null;
+    int stmtId = 0;
     Table tbl = ts.tableHandle;
     if (MetaStoreUtils.isInsertOnlyTable(tbl.getParameters())) {
-      try {
-        mmWriteId = db.getNextTableWriteId(tbl.getDbName(), tbl.getTableName());
-      } catch (HiveException e) {
-        throw new SemanticException(e);
-      }
+      txnId = 0l; //todo to be replaced with txnId in Driver
     }
 
     LoadTableDesc loadTableWork;
     loadTableWork = new LoadTableDesc(new Path(fromURI),
-      Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite, mmWriteId);
+      Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite, txnId);
+    loadTableWork.setTxnId(txnId);
+    loadTableWork.setStmtId(stmtId);
     if (preservePartitionSpecs){
       // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but
       // but preservePartitionSpecs=false(default) here is not sufficient enough

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 1bd4f26..29bc183 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -74,7 +74,6 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -6707,7 +6706,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       }
       input = genReduceSinkPlan(input, partnCols, sortCols, order.toString(), nullOrder.toString(),
               maxReducers, (AcidUtils.isFullAcidTable(dest_tab) ?
-              getAcidType(dest_tab, table_desc.getOutputFileFormatClass(), dest) : AcidUtils.Operation.NOT_ACID));
+              getAcidType(table_desc.getOutputFileFormatClass(), dest) : AcidUtils.Operation.NOT_ACID));
       reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator)input.getParentOperators().get(0));
       ctx.setMultiFileSpray(multiFileSpray);
       ctx.setNumFiles(numFiles);
@@ -6786,7 +6785,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     ListBucketingCtx lbCtx = null;
     Map<String, String> partSpec = null;
     boolean isMmTable = false, isMmCtas = false;
-    Long mmWriteId = null;
+    Long txnId = null;
 
     switch (dest_type.intValue()) {
     case QBMetaData.DEST_TABLE: {
@@ -6840,17 +6839,18 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       if (!isNonNativeTable) {
         AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
         if (destTableIsAcid) {
-          acidOp = getAcidType(dest_tab, table_desc.getOutputFileFormatClass(), dest);
+          acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
           checkAcidConstraints(qb, table_desc, dest_tab, acidOp);
         }
-        try {
-          mmWriteId = getMmWriteId(dest_tab, isMmTable);
-        } catch (HiveException e) {
-          throw new SemanticException(e);
+        if (MetaStoreUtils.isInsertOnlyTable(table_desc.getProperties())) {
+          acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
+        }
+        if (isMmTable) {
+          txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
         }
         boolean isReplace = !qb.getParseInfo().isInsertIntoTable(
             dest_tab.getDbName(), dest_tab.getTableName());
-        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, mmWriteId);
+        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, txnId);
         ltd.setLbCtx(lbCtx);
         loadTableWork.add(ltd);
       } else {
@@ -6903,16 +6903,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
           dest_part.isStoredAsSubDirectories(), conf);
       AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
       if (destTableIsAcid) {
-        acidOp = getAcidType(dest_tab, table_desc.getOutputFileFormatClass(), dest);
+        acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
         checkAcidConstraints(qb, table_desc, dest_tab, acidOp);
       }
-      try {
-        mmWriteId = getMmWriteId(dest_tab, isMmTable);
-      } catch (HiveException e) {
-        // How is this a semantic exception? Stupid Java and signatures.
-        throw new SemanticException(e);
+      if (MetaStoreUtils.isInsertOnlyTable(dest_part.getTable().getParameters())) {
+        acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
       }
-      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, mmWriteId);
+      if (isMmTable) {
+        txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
+      }
+      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, txnId);
       ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
           dest_tab.getTableName()));
       ltd.setLbCtx(lbCtx);
@@ -6946,10 +6946,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         destTableIsMaterialization = tblDesc.isMaterialization();
         if (!destTableIsTemporary && MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps(), true)) {
           isMmTable = isMmCtas = true;
-          // TODO# this should really get current ACID txn; assuming ACID works correctly the txn
-          //       should have been opened to create the ACID table. For now use the first ID.
-          mmWriteId = 0l;
-          tblDesc.setInitialMmWriteId(mmWriteId);
+          txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
+          tblDesc.setInitialMmWriteId(txnId);
         }
       } else if (viewDesc != null) {
         field_schemas = new ArrayList<FieldSchema>();
@@ -7078,11 +7076,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       genPartnCols(dest, input, qb, table_desc, dest_tab, rsCtx);
     }
 
-    assert isMmTable == (mmWriteId != null);
+    assert isMmTable == (txnId != null);
     FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, table_desc, dest_part,
         dest_path, currentTableId, destTableIsAcid, destTableIsTemporary,
         destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
-        canBeMerged, mmWriteId, isMmCtas);
+        canBeMerged, txnId, isMmCtas);
     if (isMmCtas) {
       // Add FSD so that the LoadTask compilation could fix up its path to avoid the move.
       tableDesc.setWriter(fileSinkDesc);
@@ -7185,12 +7183,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     return result;
   }
 
-  private static Long getMmWriteId(Table tbl, boolean isMmTable) throws HiveException {
-    if (!isMmTable) return null;
-    // Get the next write ID for this table. We will prefix files with this write ID.
-    return Hive.get().getNextTableWriteId(tbl.getDbName(), tbl.getTableName());
-  }
-
   private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc,
       Partition dest_part, Path dest_path, int currentTableId,
       boolean destTableIsAcid, boolean destTableIsTemporary,
@@ -7210,7 +7202,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         MetaStoreUtils.isInsertOnlyTable(dest_part.getTable().getParameters()))
         || (table_desc != null && MetaStoreUtils.isInsertOnlyTable(table_desc.getProperties()));
 
-    if (destTableIsAcid && !isDestInsertOnly) {
+    if (isDestInsertOnly) {
+      fileSinkDesc.setWriteType(Operation.INSERT);
+      acidFileSinks.add(fileSinkDesc);
+    }
+
+    if (destTableIsAcid) {
       AcidUtils.Operation wt = updating(dest) ? AcidUtils.Operation.UPDATE :
           (deleting(dest) ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT);
       fileSinkDesc.setWriteType(wt);
@@ -7422,7 +7419,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
   private void checkAcidConstraints(QB qb, TableDesc tableDesc,
                                     Table table, AcidUtils.Operation acidOp) throws SemanticException {
     String tableName = tableDesc.getTableName();
-    if (!qb.getParseInfo().isInsertIntoTable(tableName) && !Operation.INSERT_ONLY.equals(acidOp)) {
+    if (!qb.getParseInfo().isInsertIntoTable(tableName)) {
       LOG.debug("Couldn't find table " + tableName + " in insertIntoTable");
       throw new SemanticException(ErrorMsg.NO_INSERT_OVERWRITE_WITH_ACID, tableName);
     }
@@ -7437,7 +7434,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     */
     conf.set(AcidUtils.CONF_ACID_KEY, "true");
 
-    if (!Operation.NOT_ACID.equals(acidOp) && !Operation.INSERT_ONLY.equals(acidOp)) {
+    if (!Operation.NOT_ACID.equals(acidOp)) {
       if (table.getNumBuckets() < 1) {
         throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, table.getTableName());
       }
@@ -11875,7 +11872,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       if (p != null) {
         tbl = p.getTable();
       }
-      if (tbl != null && AcidUtils.isFullAcidTable(tbl)) {
+      if (tbl != null && (AcidUtils.isFullAcidTable(tbl) || MetaStoreUtils.isInsertOnlyTable(tbl.getParameters()))) {
         acidInQuery = true;
         checkAcidTxnManager(tbl);
       }
@@ -11938,7 +11935,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         tbl = writeEntity.getTable();
       }
 
-      if (tbl != null && AcidUtils.isFullAcidTable(tbl)) {
+      if (tbl != null && (AcidUtils.isFullAcidTable(tbl) || MetaStoreUtils.isInsertOnlyTable(tbl.getParameters()))) {
         acidInQuery = true;
         checkAcidTxnManager(tbl);
       }
@@ -13603,12 +13600,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
             AcidUtils.Operation.INSERT);
   }
 
-  private AcidUtils.Operation getAcidType(
-      Table table, Class<? extends OutputFormat> of, String dest) {
+  private AcidUtils.Operation getAcidType(Class<? extends OutputFormat> of, String dest) {
     if (SessionState.get() == null || !SessionState.get().getTxnMgr().supportsAcid()) {
       return AcidUtils.Operation.NOT_ACID;
-    } else if (MetaStoreUtils.isInsertOnlyTable(table.getParameters())) {
-      return AcidUtils.Operation.INSERT_ONLY;
     } else if (isAcidOutputFormat(of)) {
       return getAcidType(dest);
     } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 6629a0c..356ab6f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -20,23 +20,18 @@ package org.apache.hadoop.hive.ql.parse;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Queue;
 import java.util.Set;
-import java.util.Stack;
 
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
-import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.ql.Context;
@@ -44,7 +39,6 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.ColumnStatsTask;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
-import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.StatsTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -56,7 +50,6 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
-import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
@@ -319,21 +312,22 @@ public abstract class TaskCompiler {
   private void setLoadFileLocation(
       final ParseContext pCtx, LoadFileDesc lfd) throws SemanticException {
     // CTAS; make the movetask's destination directory the table's destination.
-    Long mmWriteIdForCtas = null;
+    Long txnIdForCtas = null;
+    int stmtId = 0; // CTAS cannot be part of multi-txn stmt
     FileSinkDesc dataSinkForCtas = null;
     String loc = null;
     if (pCtx.getQueryProperties().isCTAS()) {
       CreateTableDesc ctd = pCtx.getCreateTable();
       dataSinkForCtas = ctd.getAndUnsetWriter();
-      mmWriteIdForCtas = ctd.getInitialMmWriteId();
+      txnIdForCtas = ctd.getInitialMmWriteId();
       loc = ctd.getLocation();
     } else {
       loc = pCtx.getCreateViewDesc().getLocation();
     }
     Path location = (loc == null) ? getDefaultCtasLocation(pCtx) : new Path(loc);
-    if (mmWriteIdForCtas != null) {
+    if (txnIdForCtas != null) {
       dataSinkForCtas.setDirName(location);
-      location = new Path(location, ValidWriteIds.getMmFilePrefix(mmWriteIdForCtas));
+      location = new Path(location, AcidUtils.deltaSubdir(txnIdForCtas, txnIdForCtas, stmtId));
       lfd.setSourcePath(location);
       Utilities.LOG14535.info("Setting MM CTAS to  " + location);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
index 615c63d..8f6166a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileMergeDesc.java
@@ -28,7 +28,8 @@ public class FileMergeDesc extends AbstractOperatorDesc {
   private int listBucketingDepth;
   private boolean hasDynamicPartitions;
   private boolean isListBucketingAlterTableConcatenate;
-  private Long mmWriteId;
+  private Long txnId;
+  private int stmtId;
 
   public FileMergeDesc(DynamicPartitionCtx dynPartCtx, Path outputDir) {
     this.dpCtx = dynPartCtx;
@@ -75,11 +76,19 @@ public class FileMergeDesc extends AbstractOperatorDesc {
     this.isListBucketingAlterTableConcatenate = isListBucketingAlterTableConcatenate;
   }
 
-  public Long getMmWriteId() {
-    return mmWriteId;
+  public Long getTxnId() {
+    return txnId;
   }
 
-  public void setMmWriteId(Long mmWriteId) {
-    this.mmWriteId = mmWriteId;
+  public void setTxnId(Long txnId) {
+    this.txnId = txnId;
+  }
+
+  public int getStmtId() {
+    return stmtId;
+  }
+
+  public void setStmtId(int stmtId) {
+    this.stmtId = stmtId;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index d0d5acb..7f4cabe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidWriteIds;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
@@ -203,7 +202,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   public Path getMergeInputDirName() {
     Path root = getFinalDirName();
     if (mmWriteId == null) return root;
-    return new Path(root, ValidWriteIds.getMmFilePrefix(mmWriteId));
+    return new Path(root, AcidUtils.deltaSubdir(txnId, txnId, 0));
   }
 
   @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
@@ -476,6 +475,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   }
   public void setTransactionId(long id) {
     txnId = id;
+    setMmWriteId(id);
   }
   public long getTransactionId() {
     return txnId;

http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java
index 9d5c6b8..5e19729 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java
@@ -68,6 +68,14 @@ public class LoadMultiFilesDesc implements Serializable {
     return srcDirs;
   }
 
+  public void setSourceDirs(List<Path> srcs) {
+    this.srcDirs = srcs;
+  }
+
+  public void setTargetDirs(final List<Path> targetDir) {
+    this.targetDirs = targetDir;
+  }
+
   @Explain(displayName = "hdfs directory")
   public boolean getIsDfsDir() {
     return isDfsDir;


Mime
View raw message