hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ekoif...@apache.org
Subject [3/3] hive git commit: HIVE-20823: Make Compactor run in a transaction (Eugene Koifman, reviewed by Vaibhav Gumashta)
Date Wed, 21 Nov 2018 23:06:32 GMT
HIVE-20823: Make Compactor run in a transaction (Eugene Koifman, reviewed by Vaibhav Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ddf3b6cd
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ddf3b6cd
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ddf3b6cd

Branch: refs/heads/master
Commit: ddf3b6cd041a9eae67e70074cc1ab7c6bd413613
Parents: f5b14fc
Author: Eugene Koifman <ekoifman@apache.org>
Authored: Wed Nov 21 15:06:13 2018 -0800
Committer: Eugene Koifman <ekoifman@apache.org>
Committed: Wed Nov 21 15:06:13 2018 -0800

----------------------------------------------------------------------
 .../hive/hcatalog/streaming/TestStreaming.java  |  34 ++-
 .../streaming/mutate/StreamingAssert.java       |  14 +-
 .../apache/hadoop/hive/ql/TestAcidOnTez.java    |  15 +-
 .../hive/ql/txn/compactor/TestCompactor.java    |  51 ++--
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |   1 +
 .../hadoop/hive/ql/io/AcidInputFormat.java      |  29 +-
 .../hadoop/hive/ql/io/AcidOutputFormat.java     |  17 ++
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 188 +++++++++---
 .../hive/ql/io/orc/OrcRawRecordMerger.java      |   6 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |   4 +-
 .../hadoop/hive/ql/parse/ReplicationSpec.java   |  17 +-
 .../hadoop/hive/ql/parse/repl/dump/Utils.java   |   2 +
 .../hadoop/hive/ql/txn/compactor/Cleaner.java   | 283 +++++--------------
 .../hive/ql/txn/compactor/CompactorMR.java      |  36 ++-
 .../hive/ql/txn/compactor/CompactorThread.java  |   7 +-
 .../hadoop/hive/ql/txn/compactor/Initiator.java |  10 +-
 .../hadoop/hive/ql/txn/compactor/Worker.java    |  73 +++--
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |  64 ++---
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 118 ++++----
 .../apache/hadoop/hive/ql/TestTxnCommands3.java | 155 ++++++++--
 .../hive/ql/TestTxnCommandsForMmTable.java      |  34 +--
 .../hadoop/hive/ql/TestTxnConcatenate.java      |  12 +-
 .../org/apache/hadoop/hive/ql/TestTxnExIm.java  |   6 +-
 .../apache/hadoop/hive/ql/TestTxnLoadData.java  |  50 ++--
 .../apache/hadoop/hive/ql/TestTxnNoBuckets.java |  78 ++---
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java |  10 +-
 .../hadoop/hive/ql/io/TestAcidInputFormat.java  |   6 +-
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java |  41 ++-
 .../hive/ql/io/orc/TestInputOutputFormat.java   |  18 ++
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |  16 +-
 .../TestVectorizedOrcAcidRowBatchReader.java    |  11 +
 .../hive/ql/lockmgr/TestDbTxnManager2.java      |  19 +-
 .../hive/ql/txn/compactor/CompactorTest.java    |   2 +-
 .../hive/ql/txn/compactor/TestCleaner.java      | 239 ----------------
 .../hive/ql/txn/compactor/TestWorker.java       | 157 +++++-----
 .../hive/metastore/txn/TxnCommonUtils.java      |   2 +-
 .../metastore/txn/CompactionTxnHandler.java     |  87 ++++--
 .../hadoop/hive/metastore/txn/TxnStore.java     |  10 +
 .../hadoop/hive/metastore/txn/TxnUtils.java     |  45 +--
 .../hive/metastore/TestHiveMetaStoreTxns.java   |  57 ----
 .../hive/common/ValidCompactorWriteIdList.java  |   3 +
 .../hive/common/ValidReaderWriteIdList.java     |   6 +
 .../apache/hive/streaming/TestStreaming.java    |  48 ++--
 43 files changed, 1066 insertions(+), 1015 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 137323c..b290a40 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -30,6 +30,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +47,8 @@ import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.Validator;
@@ -61,11 +64,13 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnInfo;
 import org.apache.hadoop.hive.metastore.api.TxnState;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -407,13 +412,13 @@ public class TestStreaming {
     rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID");
 
     Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar"));
-    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
     Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4"));
-    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
     Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6"));
-    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
     Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t0\t0"));
-    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000"));
+    Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005_v0000025/bucket_00000"));
   }
 
   /**
@@ -553,7 +558,7 @@ public class TestStreaming {
   @Deprecated
   private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
                                 String... records) throws Exception {
-    ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
+    ValidWriteIdList writeIds = getTransactionContext(conf);
     AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
@@ -587,6 +592,7 @@ public class TestStreaming {
     AcidUtils.setAcidOperationalProperties(job, true, null);
     job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString());
+    job.set(ValidTxnList.VALID_TXNS_KEY, conf.get(ValidTxnList.VALID_TXNS_KEY));
     InputSplit[] splits = inf.getSplits(job, buckets);
     Assert.assertEquals(numExpectedFiles, splits.length);
     org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr =
@@ -606,8 +612,7 @@ public class TestStreaming {
    */
   private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
                                 String validationQuery, boolean vectorize, String... records) throws Exception {
-    ValidWriteIdList txns = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf));
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -649,9 +654,15 @@ public class TestStreaming {
     conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorizationEnabled);
   }
 
+  private ValidWriteIdList getTransactionContext(Configuration conf) throws Exception {
+    ValidTxnList validTxnList = msClient.getValidTxns();
+    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+    List<TableValidWriteIds> v = msClient.getValidWriteIds(Collections
+        .singletonList(TableName.getDbTable(dbName, tblName)), validTxnList.writeToString());
+    return TxnCommonUtils.createValidReaderWriteIdList(v.get(0));
+  }
   private void checkNothingWritten(Path partitionPath) throws Exception {
-    ValidWriteIdList writeIds = msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName));
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf));
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -1234,8 +1245,7 @@ public class TestStreaming {
     /*now both batches have committed (but not closed) so we for each primary file we expect a side
     file to exist and indicate the true length of primary file*/
     FileSystem fs = partLoc.getFileSystem(conf);
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf,
-            msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf));
     for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
       for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
@@ -1260,7 +1270,7 @@ public class TestStreaming {
     //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length.  Furthermore, each bucket0
     //has now received more data(logically - it's buffered) but it is not yet committed.
     //lets check that side files exist, etc
-    dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidWriteIds(AcidUtils.getFullTableName(dbName, tblName)));
+    dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf));
     for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
       for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
index 0edf1cd..78cae72 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
@@ -25,6 +25,8 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -32,7 +34,9 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
@@ -72,6 +76,7 @@ public class StreamingAssert {
   private IMetaStoreClient metaStoreClient;
   private Directory dir;
   private ValidWriteIdList writeIds;
+  private ValidTxnList validTxnList;
   private List<AcidUtils.ParsedDelta> currentDeltas;
   private long min;
   private long max;
@@ -83,7 +88,13 @@ public class StreamingAssert {
     this.table = table;
     this.partition = partition;
 
-    writeIds = metaStoreClient.getValidWriteIds(AcidUtils.getFullTableName(table.getDbName(), table.getTableName()));
+
+    validTxnList = metaStoreClient.getValidTxns();
+    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+    List<TableValidWriteIds> v = metaStoreClient.getValidWriteIds(Collections
+        .singletonList(TableName.getDbTable(table.getDbName(), table.getTableName())), validTxnList.writeToString());
+    writeIds = TxnCommonUtils.createValidReaderWriteIdList(v.get(0));
+
     partitionLocation = getPartitionLocation();
     dir = AcidUtils.getAcidState(partitionLocation, conf, writeIds);
     assertEquals(0, dir.getObsolete().size());
@@ -146,6 +157,7 @@ public class StreamingAssert {
     AcidUtils.setAcidOperationalProperties(job, true, null);
     job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true);
     job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIds.toString());
+    job.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
     InputSplit[] splits = inputFormat.getSplits(job, 1);
     assertEquals(numSplitsExpected, splits.length);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index 40dd992..d6a4191 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -608,11 +608,16 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree  ~/dev/hiverwgit/itests/h
       LOG.warn(s);
     }
     String[][] expected2 = {
-       {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "warehouse/t/base_-9223372036854775808/bucket_00000"},
-      {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "warehouse/t/base_-9223372036854775808/bucket_00000"},
-      {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "warehouse/t/base_-9223372036854775808/bucket_00000"},
-      {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t7\t8", "warehouse/t/base_-9223372036854775808/bucket_00000"},
-      {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10", "warehouse/t/base_-9223372036854775808/bucket_00000"}
+       {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
+           "warehouse/t/base_-9223372036854775808_v0000024/bucket_00000"},
+      {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4",
+          "warehouse/t/base_-9223372036854775808_v0000024/bucket_00000"},
+      {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6",
+          "warehouse/t/base_-9223372036854775808_v0000024/bucket_00000"},
+      {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t7\t8",
+          "warehouse/t/base_-9223372036854775808_v0000024/bucket_00000"},
+      {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10",
+          "warehouse/t/base_-9223372036854775808_v0000024/bucket_00000"}
     };
     Assert.assertEquals("Unexpected row count after major compact", expected2.length, rs.size());
     for(int i = 0; i < expected2.length; i++) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 9648645..beb36d7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -713,15 +713,15 @@ public class TestCompactor {
       Path resultFile = null;
       for (int i = 0; i < names.length; i++) {
         names[i] = stat[i].getPath().getName();
-        if (names[i].equals("delta_0000001_0000004")) {
+        if (names[i].equals("delta_0000001_0000004_v0000009")) {
           resultFile = stat[i].getPath();
         }
       }
       Arrays.sort(names);
       String[] expected = new String[]{"delta_0000001_0000002",
-        "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
+        "delta_0000001_0000004_v0000009", "delta_0000003_0000004", "delta_0000005_0000006"};
       if (!Arrays.deepEquals(expected, names)) {
-        Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
+        Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names) + ",stat=" + toString(stat));
       }
       checkExpectedTxnsPresent(null, new Path[]{resultFile}, columnNamesProperty, columnTypesProperty,
         0, 1L, 4L, 1);
@@ -767,7 +767,7 @@ public class TestCompactor {
         Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
       }
       String name = stat[0].getPath().getName();
-      Assert.assertEquals(name, "base_0000004");
+      Assert.assertEquals("base_0000004_v0000009", name);
       checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1);
     } finally {
       if (connection != null) {
@@ -858,13 +858,13 @@ public class TestCompactor {
     Path resultDelta = null;
     for (int i = 0; i < names.length; i++) {
       names[i] = stat[i].getPath().getName();
-      if (names[i].equals("delta_0000001_0000004")) {
+      if (names[i].equals("delta_0000001_0000004_v0000009")) {
         resultDelta = stat[i].getPath();
       }
     }
     Arrays.sort(names);
     String[] expected = new String[]{"delta_0000001_0000002",
-      "delta_0000001_0000004", "delta_0000003_0000004"};
+      "delta_0000001_0000004_v0000009", "delta_0000003_0000004"};
     if (!Arrays.deepEquals(expected, names)) {
       Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
     }
@@ -947,7 +947,7 @@ public class TestCompactor {
       Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
     }
     String name = stat[0].getPath().getName();
-    if (!name.equals("base_0000004")) {
+    if (!name.equals("base_0000004_v0000009")) {
       Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000004");
     }
     checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1);
@@ -1312,7 +1312,7 @@ public class TestCompactor {
       Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
     }
     String name = stat[0].getPath().getName();
-    Assert.assertEquals(name, "base_0000004");
+    Assert.assertEquals("base_0000004_v0000009", name);
     checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 1, 1L, 4L, 2);
     if (connection1 != null) {
       connection1.close();
@@ -1368,12 +1368,12 @@ public class TestCompactor {
     Path minorCompactedDelta = null;
     for (int i = 0; i < deltas.length; i++) {
       deltas[i] = stat[i].getPath().getName();
-      if (deltas[i].equals("delta_0000001_0000003")) {
+      if (deltas[i].equals("delta_0000001_0000003_v0000006")) {
         minorCompactedDelta = stat[i].getPath();
       }
     }
     Arrays.sort(deltas);
-    String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003",
+    String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003_v0000006",
       "delta_0000002_0000002_0000"};
     if (!Arrays.deepEquals(expectedDeltas, deltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
@@ -1388,12 +1388,12 @@ public class TestCompactor {
     Path minorCompactedDeleteDelta = null;
     for (int i = 0; i < deleteDeltas.length; i++) {
       deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
-      if (deleteDeltas[i].equals("delete_delta_0000001_0000003")) {
+      if (deleteDeltas[i].equals("delete_delta_0000001_0000003_v0000006")) {
         minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
       }
     }
     Arrays.sort(deleteDeltas);
-    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003", "delete_delta_0000003_0000003_0000"};
+    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003_v0000006", "delete_delta_0000003_0000003_0000"};
     if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
     }
@@ -1446,12 +1446,12 @@ public class TestCompactor {
     Path minorCompactedDelta = null;
     for (int i = 0; i < deltas.length; i++) {
       deltas[i] = stat[i].getPath().getName();
-      if (deltas[i].equals("delta_0000001_0000002")) {
+      if (deltas[i].equals("delta_0000001_0000002_v0000005")) {
         minorCompactedDelta = stat[i].getPath();
       }
     }
     Arrays.sort(deltas);
-    String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002",
+    String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002_v0000005",
       "delta_0000002_0000002_0000"};
     if (!Arrays.deepEquals(expectedDeltas, deltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
@@ -1466,12 +1466,12 @@ public class TestCompactor {
     Path minorCompactedDeleteDelta = null;
     for (int i = 0; i < deleteDeltas.length; i++) {
       deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
-      if (deleteDeltas[i].equals("delete_delta_0000001_0000002")) {
+      if (deleteDeltas[i].equals("delete_delta_0000001_0000002_v0000005")) {
         minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
       }
     }
     Arrays.sort(deleteDeltas);
-    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002"};
+    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002_v0000005"};
     if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
     }
@@ -1537,13 +1537,13 @@ public class TestCompactor {
     Path resultFile = null;
     for (int i = 0; i < names.length; i++) {
       names[i] = stat[i].getPath().getName();
-      if (names[i].equals("delta_0000001_0000004")) {
+      if (names[i].equals("delta_0000001_0000004_v0000009")) {
         resultFile = stat[i].getPath();
       }
     }
     Arrays.sort(names);
     String[] expected = new String[]{"delta_0000001_0000002",
-      "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
+      "delta_0000001_0000004_v0000009", "delta_0000003_0000004", "delta_0000005_0000006"};
     if (!Arrays.deepEquals(expected, names)) {
       Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
     }
@@ -1557,12 +1557,12 @@ public class TestCompactor {
     Path minorCompactedDeleteDelta = null;
     for (int i = 0; i < deleteDeltas.length; i++) {
       deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
-      if (deleteDeltas[i].equals("delete_delta_0000001_0000004")) {
+      if (deleteDeltas[i].equals("delete_delta_0000001_0000004_v0000009")) {
         minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
       }
     }
     Arrays.sort(deleteDeltas);
-    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000004"};
+    String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000004_v0000009"};
     if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
       Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
     }
@@ -1968,4 +1968,15 @@ public class TestCompactor {
     t.init(stop, looped);
     t.run();
   }
+  private static String toString(FileStatus[] stat) {
+    StringBuilder sb = new StringBuilder("stat{");
+    if(stat == null) {
+      return sb.toString();
+    }
+    for(FileStatus f : stat) {
+      sb.append(f.getPath()).append(",");
+    }
+    sb.setCharAt(sb.length() - 1, '}');
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 28d61f9..5d6ae7f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -302,6 +302,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
       tuple.replicationSpec.setIsReplace(true);  // by default for all other objects this is false
       if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
+        tuple.replicationSpec.setValidTxnList(validTxnList);
         tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList));
 
         // For transactional table, data would be valid snapshot for current txn and doesn't include data

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
index 41007e2..bba3960 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
@@ -112,21 +112,26 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
     private long minWriteId;
     private long maxWriteId;
     private List<Integer> stmtIds;
-    //would be useful to have enum for Type: insert/delete/load data
+    /**
+     * {@link AcidUtils#?}
+     */
+    private long visibilityTxnId;
 
     public DeltaMetaData() {
-      this(0,0,new ArrayList<Integer>());
+      this(0,0,new ArrayList<Integer>(), 0);
     }
     /**
      * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition
+     * @param visibilityTxnId maybe 0, if the dir name didn't have it.  txnid:0 is always visible
      */
-    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds) {
+    DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds, long visibilityTxnId) {
       this.minWriteId = minWriteId;
       this.maxWriteId = maxWriteId;
       if (stmtIds == null) {
         throw new IllegalArgumentException("stmtIds == null");
       }
       this.stmtIds = stmtIds;
+      this.visibilityTxnId = visibilityTxnId;
     }
     long getMinWriteId() {
       return minWriteId;
@@ -137,6 +142,9 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
     List<Integer> getStmtIds() {
       return stmtIds;
     }
+    long getVisibilityTxnId() {
+      return visibilityTxnId;
+    }
     @Override
     public void write(DataOutput out) throws IOException {
       out.writeLong(minWriteId);
@@ -145,6 +153,7 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
       for(Integer id : stmtIds) {
         out.writeInt(id);
       }
+      out.writeLong(visibilityTxnId);
     }
     @Override
     public void readFields(DataInput in) throws IOException {
@@ -155,11 +164,21 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE>
       for(int i = 0; i < numStatements; i++) {
         stmtIds.add(in.readInt());
       }
+      visibilityTxnId = in.readLong();
+    }
+    String getName() {
+      assert stmtIds.isEmpty() : "use getName(int)";
+      return AcidUtils.addVisibilitySuffix(AcidUtils
+          .deleteDeltaSubdir(minWriteId, maxWriteId), visibilityTxnId);
+    }
+    String getName(int stmtId) {
+      assert !stmtIds.isEmpty() : "use getName()";
+      return AcidUtils.addVisibilitySuffix(AcidUtils
+          .deleteDeltaSubdir(minWriteId, maxWriteId, stmtId), visibilityTxnId);
     }
     @Override
     public String toString() {
-      //? is Type - when implemented
-      return "Delta(?," + minWriteId + "," + maxWriteId + "," + stmtIds + ")";
+      return "Delta(?," + minWriteId + "," + maxWriteId + "," + stmtIds + "," + visibilityTxnId + ")";
     }
   }
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
index 05beafe..b45cc8c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
@@ -51,6 +51,9 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     private Reporter reporter;
     private long minimumWriteId;
     private long maximumWriteId;
+    /**
+     * actual bucketId (as opposed to bucket property via BucketCodec)
+     */
     private int bucketId;
     /**
      * Based on {@link org.apache.hadoop.hive.ql.metadata.Hive#mvFile(HiveConf, FileSystem, Path, FileSystem, Path, boolean, boolean)}
@@ -61,9 +64,16 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     private boolean oldStyle = false;
     private int recIdCol = -1;  // Column the record identifier is in, -1 indicates no record id
     //unique within a transaction
+    /**
+     * todo: Link to AcidUtils?
+     */
     private int statementId = 0;
     private Path finalDestination;
     /**
+     * todo: link to AcidUtils?
+     */
+    private long visibilityTxnId = 0;
+    /**
      * Create the options object.
      * @param conf Use the given configuration
      */
@@ -252,6 +262,10 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
       this.finalDestination = p;
       return this;
     }
+    public Options visibilityTxnId(long visibilityTxnId) {
+      this.visibilityTxnId = visibilityTxnId;
+      return this;
+    }
 
     public Configuration getConfiguration() {
       return configuration;
@@ -317,6 +331,9 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO
     public Path getFinalDestination() {
       return finalDestination;
     }
+    public long getVisibilityTxnId() {
+      return visibilityTxnId;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 71e5131..d36b4d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -32,7 +32,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
 
-import org.apache.avro.generic.GenericData;
+import com.google.common.base.Strings;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -46,7 +46,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.LockComponentBuilder;
 import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
-import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.LockComponent;
 import org.apache.hadoop.hive.metastore.api.LockType;
@@ -57,7 +56,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.Entity;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
@@ -181,6 +179,8 @@ public class AcidUtils {
       return !name.startsWith("_") && !name.startsWith(".");
     }
   };
+  public static final String VISIBILITY_PREFIX = "_v";
+  public static final Pattern VISIBILITY_PATTERN = Pattern.compile(VISIBILITY_PREFIX + "\\d+");
 
   private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
 
@@ -294,8 +294,21 @@ public class AcidUtils {
                         options.getMaximumWriteId(),
                         options.getStatementId());
     }
+    subdir = addVisibilitySuffix(subdir, options.getVisibilityTxnId());
     return createBucketFile(new Path(directory, subdir), options.getBucketId());
   }
+
+  /**
+   * Since Hive 4.0, compactor produces directories with {@link #VISIBILITY_PATTERN} suffix.
+   * _v0 is equivalent to no suffix, for backwards compatibility.
+   */
+  static String addVisibilitySuffix(String baseOrDeltaDir, long visibilityTxnId) {
+    if(visibilityTxnId == 0) {
+      return baseOrDeltaDir;
+    }
+    return baseOrDeltaDir + VISIBILITY_PREFIX
+        + String.format(DELTA_DIGITS, visibilityTxnId);
+  }
   /**
    * Represents bucketId and copy_N suffix
    */
@@ -340,20 +353,6 @@ public class AcidUtils {
     }
   }
   /**
-   * Get the write id from a base directory name.
-   * @param path the base directory name
-   * @return the maximum write id that is included
-   */
-  public static long parseBase(Path path) {
-    String filename = path.getName();
-    if (filename.startsWith(BASE_PREFIX)) {
-      return Long.parseLong(filename.substring(BASE_PREFIX.length()));
-    }
-    throw new IllegalArgumentException(filename + " does not start with " +
-        BASE_PREFIX);
-  }
-
-  /**
    * Get the bucket id from the file path
    * @param bucketFile - bucket file path
    * @return - bucket id
@@ -405,7 +404,7 @@ public class AcidUtils {
         result
             .setOldStyle(false)
             .minimumWriteId(0)
-            .maximumWriteId(parseBase(bucketFile.getParent()))
+            .maximumWriteId(ParsedBase.parseBase(bucketFile.getParent()).getWriteId())
             .bucket(bucket)
             .writingBase(true);
       } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) {
@@ -443,11 +442,12 @@ public class AcidUtils {
     public DirectoryImpl(List<FileStatus> abortedDirectories,
         boolean isBaseInRawFormat, List<HdfsFileStatusWithId> original,
         List<FileStatus> obsolete, List<ParsedDelta> deltas, Path base) {
-      this.abortedDirectories = abortedDirectories;
+      this.abortedDirectories = abortedDirectories == null ?
+          Collections.emptyList() : abortedDirectories;
       this.isBaseInRawFormat = isBaseInRawFormat;
-      this.original = original;
-      this.obsolete = obsolete;
-      this.deltas = deltas;
+      this.original = original == null ? Collections.emptyList() : original;
+      this.obsolete = obsolete == null ? Collections.emptyList() : obsolete;
+      this.deltas = deltas == null ? Collections.emptyList() : deltas;
       this.base = base;
     }
 
@@ -757,7 +757,45 @@ public class AcidUtils {
      */
     List<FileStatus> getAbortedDirectories();
   }
-
+  /**
+   * Since version 3 but prior to version 4, format of a base is "base_X" where X is a writeId.
+   * If this base was produced by a compactor, X is the highest writeId that the compactor included.
+   * If this base is produced by Insert Overwrite stmt, X is a writeId of the transaction that
+   * executed the insert.
+   * Since Hive Version 4.0, the format of a base produced by a compactor is
+   * base_X_vY.  X is like before, i.e. the highest writeId compactor included and Y is the
+   * visibilityTxnId of the transaction in which the compactor ran.
+   * (v(isibility) is a literal to help parsing).
+   */
+  public static final class ParsedBase {
+    private final long writeId;
+    private final long visibilityTxnId;
+    ParsedBase(long writeId) {
+      this(writeId, 0);
+    }
+    ParsedBase(long writeId, long visibilityTxnId) {
+      this.writeId = writeId;
+      this.visibilityTxnId = visibilityTxnId;
+    }
+    public long getWriteId() {
+      return writeId;
+    }
+    public long getVisibilityTxnId() {
+      return visibilityTxnId;
+    }
+    public static ParsedBase parseBase(Path path) {
+      String filename = path.getName();
+      if(!filename.startsWith(BASE_PREFIX)) {
+        throw new IllegalArgumentException(filename + " does not start with " + BASE_PREFIX);
+      }
+      int idxOfv = filename.indexOf(VISIBILITY_PREFIX);
+      if(idxOfv < 0) {
+        return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length())));
+      }
+      return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length(), idxOfv)),
+          Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length())));
+    }
+  }
   /**
    * Immutable
    */
@@ -770,16 +808,23 @@ public class AcidUtils {
     private final int statementId;
     private final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...'
     private final boolean isRawFormat;
-
+    /**
+     * transaction Id of txn which created this delta.  This dir should be considered
+     * invisible unless this txn is committed
+     *
+     * TODO: define TransactionallyVisible interface - add getVisibilityTxnId() etc and all comments
+     * use in {@link ParsedBase}, {@link ParsedDelta}, {@link AcidInputFormat.Options}, AcidInputFormat.DeltaMetaData etc
+     */
+    private final long visibilityTxnId;
     /**
      * for pre 1.3.x delta files
      */
     private ParsedDelta(long min, long max, FileStatus path, boolean isDeleteDelta,
-        boolean isRawFormat) {
-      this(min, max, path, -1, isDeleteDelta, isRawFormat);
+        boolean isRawFormat, long visibilityTxnId) {
+      this(min, max, path, -1, isDeleteDelta, isRawFormat, visibilityTxnId);
     }
     private ParsedDelta(long min, long max, FileStatus path, int statementId,
-        boolean isDeleteDelta, boolean isRawFormat) {
+        boolean isDeleteDelta, boolean isRawFormat, long visibilityTxnId) {
       this.minWriteId = min;
       this.maxWriteId = max;
       this.path = path;
@@ -787,6 +832,7 @@ public class AcidUtils {
       this.isDeleteDelta = isDeleteDelta;
       this.isRawFormat = isRawFormat;
       assert !isDeleteDelta || !isRawFormat : " deleteDelta should not be raw format";
+      this.visibilityTxnId = visibilityTxnId;
     }
 
     public long getMinWriteId() {
@@ -814,6 +860,9 @@ public class AcidUtils {
     public boolean isRawFormat() {
       return isRawFormat;
     }
+    public long getVisibilityTxnId() {
+      return visibilityTxnId;
+    }
     /**
      * Compactions (Major/Minor) merge deltas/bases but delete of old files
      * happens in a different process; thus it's possible to have bases/deltas with
@@ -870,6 +919,7 @@ public class AcidUtils {
   }
 
   /**
+   * todo: rename serializeDeleteDelta()?
    * Convert the list of deltas into an equivalent list of begin/end
    * write id pairs.  Assumes {@code deltas} is sorted.
    * @param deltas
@@ -879,6 +929,7 @@ public class AcidUtils {
     List<AcidInputFormat.DeltaMetaData> result = new ArrayList<>(deltas.size());
     AcidInputFormat.DeltaMetaData last = null;
     for (ParsedDelta parsedDelta : deltas) {
+      assert parsedDelta.isDeleteDelta() : "expected delete_delta, got " + parsedDelta.getPath();
       if ((last != null)
               && (last.getMinWriteId() == parsedDelta.getMinWriteId())
               && (last.getMaxWriteId() == parsedDelta.getMaxWriteId())) {
@@ -886,7 +937,7 @@ public class AcidUtils {
         continue;
       }
       last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinWriteId(),
-              parsedDelta.getMaxWriteId(), new ArrayList<Integer>());
+              parsedDelta.getMaxWriteId(), new ArrayList<>(), parsedDelta.getVisibilityTxnId());
       result.add(last);
       if (parsedDelta.statementId >= 0) {
         last.getStmtIds().add(parsedDelta.getStatementId());
@@ -905,14 +956,14 @@ public class AcidUtils {
    * @return the list of delta paths
    */
   public static Path[] deserializeDeleteDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deleteDeltas) throws IOException {
-    List<Path> results = new ArrayList<Path>(deleteDeltas.size());
+    List<Path> results = new ArrayList<>(deleteDeltas.size());
     for(AcidInputFormat.DeltaMetaData dmd : deleteDeltas) {
       if(dmd.getStmtIds().isEmpty()) {
-        results.add(new Path(root, deleteDeltaSubdir(dmd.getMinWriteId(), dmd.getMaxWriteId())));
+        results.add(new Path(root, dmd.getName()));
         continue;
       }
       for(Integer stmtId : dmd.getStmtIds()) {
-        results.add(new Path(root, deleteDeltaSubdir(dmd.getMinWriteId(), dmd.getMaxWriteId(), stmtId)));
+        results.add(new Path(root, dmd.getName(stmtId)));
       }
     }
     return results.toArray(new Path[results.size()]);
@@ -936,7 +987,7 @@ public class AcidUtils {
     ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix, fs);
     boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
     return new ParsedDelta(p.getMinWriteId(),
-        p.getMaxWriteId(), path, p.statementId, isDeleteDelta, p.isRawFormat());
+        p.getMaxWriteId(), path, p.statementId, isDeleteDelta, p.isRawFormat(), p.visibilityTxnId);
   }
 
   public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs)
@@ -959,6 +1010,12 @@ public class AcidUtils {
    */
   public static ParsedDelta parsedDelta(Path deltaDir, boolean isRawFormat) {
     String filename = deltaDir.getName();
+    int idxOfVis = filename.indexOf(VISIBILITY_PREFIX);
+    long visibilityTxnId = 0;//visibilityTxnId:0 is always visible
+    if(idxOfVis >= 0) {
+      visibilityTxnId = Long.parseLong(filename.substring(idxOfVis + VISIBILITY_PREFIX.length()));
+      filename = filename.substring(0, idxOfVis);
+    }
     boolean isDeleteDelta = filename.startsWith(DELETE_DELTA_PREFIX);
     //make sure it's null for delete delta no matter what was passed in - this
     //doesn't apply to delete deltas
@@ -966,18 +1023,18 @@ public class AcidUtils {
     String rest = filename.substring((isDeleteDelta ?
         DELETE_DELTA_PREFIX : DELTA_PREFIX).length());
     int split = rest.indexOf('_');
-    //may be -1 if no statementId
+    //split2 may be -1 if no statementId
     int split2 = rest.indexOf('_', split + 1);
     long min = Long.parseLong(rest.substring(0, split));
     long max = split2 == -1 ?
         Long.parseLong(rest.substring(split + 1)) :
         Long.parseLong(rest.substring(split + 1, split2));
     if(split2 == -1) {
-      return new ParsedDelta(min, max, null, isDeleteDelta, isRawFormat);
+      return new ParsedDelta(min, max, null, isDeleteDelta, isRawFormat, visibilityTxnId);
     }
     int statementId = Integer.parseInt(rest.substring(split2 + 1));
     return new ParsedDelta(min, max, null, statementId, isDeleteDelta,
-        isRawFormat);
+        isRawFormat, visibilityTxnId);
 
   }
 
@@ -1046,6 +1103,23 @@ public class AcidUtils {
                                        Ref<Boolean> useFileIds,
                                        boolean ignoreEmptyFiles,
                                        Map<String, String> tblproperties) throws IOException {
+    ValidTxnList validTxnList = null;
+    String s = conf.get(ValidTxnList.VALID_TXNS_KEY);
+    if(!Strings.isNullOrEmpty(s)) {
+      /**
+       * getAcidState() is sometimes called on non-transactional tables, e.g.
+       * OrcInputFileFormat.FileGenerator.callInternal().  e.g. orc_merge3.q In that case
+       * writeIdList is bogus - doesn't even have a table name.
+       * see https://issues.apache.org/jira/browse/HIVE-20856.
+       *
+       * For now, assert that ValidTxnList.VALID_TXNS_KEY is set only if this is really a read
+       * of a transactional table.
+       * see {@link #getChildState(FileStatus, HdfsFileStatusWithId, ValidWriteIdList, List, List, List, List, TxnBase, boolean, List, Map, FileSystem, ValidTxnList)}
+       */
+      validTxnList = new ValidReadTxnList();
+      validTxnList.readFromString(s);
+    }
+
     FileSystem fs = directory.getFileSystem(conf);
     // The following 'deltas' includes all kinds of delta files including insert & delete deltas.
     final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
@@ -1073,13 +1147,13 @@ public class AcidUtils {
     if (childrenWithId != null) {
       for (HdfsFileStatusWithId child : childrenWithId) {
         getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original,
-            obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs);
+            obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
       }
     } else {
       List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter);
       for (FileStatus child : children) {
         getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete,
-            bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs);
+            bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
       }
     }
 
@@ -1219,7 +1293,7 @@ public class AcidUtils {
       ValidWriteIdList writeIdList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
       List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase,
       boolean ignoreEmptyFiles, List<FileStatus> aborted, Map<String, String> tblproperties,
-      FileSystem fs) throws IOException {
+      FileSystem fs, ValidTxnList validTxnList) throws IOException {
     Path p = child.getPath();
     String fn = p.getName();
     if (!child.isDirectory()) {
@@ -1229,7 +1303,11 @@ public class AcidUtils {
       return;
     }
     if (fn.startsWith(BASE_PREFIX)) {
-      long writeId = parseBase(p);
+      ParsedBase  parsedBase = ParsedBase.parseBase(p);
+      if(!isDirUsable(child, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) {
+        return;
+      }
+      long writeId = parsedBase.getWriteId();
       if(bestBase.oldestBaseWriteId > writeId) {
         //keep track for error reporting
         bestBase.oldestBase = p;
@@ -1252,13 +1330,14 @@ public class AcidUtils {
     } else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) {
       String deltaPrefix = fn.startsWith(DELTA_PREFIX)  ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
       ParsedDelta delta = parseDelta(child, deltaPrefix, fs);
-      // Handle aborted deltas. Currently this can only happen for MM tables.
-      if (tblproperties != null && isTransactionalTable(tblproperties) &&
-        ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted(
-            delta.minWriteId, delta.maxWriteId)) {
+      if(!isDirUsable(child, delta.getVisibilityTxnId(), aborted, validTxnList)) {
+        return;
+      }
+      if(ValidWriteIdList.RangeResponse.ALL ==
+          writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) {
         aborted.add(child);
       }
-      if (writeIdList.isWriteIdRangeValid(
+      else if (writeIdList.isWriteIdRangeValid(
           delta.minWriteId, delta.maxWriteId) != ValidWriteIdList.RangeResponse.NONE) {
         working.add(delta);
       }
@@ -1270,7 +1349,24 @@ public class AcidUtils {
       originalDirectories.add(child);
     }
   }
-
+  /**
+   * checks {@code visibilityTxnId} to see if {@code child} is committed in current snapshot
+   */
+  private static boolean isDirUsable(FileStatus child, long visibilityTxnId,
+      List<FileStatus> aborted, ValidTxnList validTxnList) {
+    if(validTxnList == null) {
+      throw new IllegalArgumentException("No ValidTxnList for " + child.getPath());
+    }
+    if(!validTxnList.isTxnValid(visibilityTxnId)) {
+      boolean isAborted = validTxnList.isTxnAborted(visibilityTxnId);
+      if(isAborted) {
+        aborted.add(child);//so we can clean it up
+      }
+      LOG.debug("getChildState() ignoring(" + aborted + ") " + child);
+      return false;
+    }
+    return true;
+  }
   public static HdfsFileStatusWithId createOriginalObj(
       HdfsFileStatusWithId childWithId, FileStatus child) {
     return childWithId != null ? childWithId : new HdfsFileStatusWithoutId(child);
@@ -1818,7 +1914,7 @@ public class AcidUtils {
   }
 
   public static String getFullTableName(String dbName, String tableName) {
-    return dbName.toLowerCase() + "." + tableName.toLowerCase();
+    return TableName.getDbTable(dbName.toLowerCase(), tableName.toLowerCase());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 8cabf96..fbb931c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -1035,7 +1035,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
           Options readerPairOptions = mergerOptions;
           if(mergerOptions.getBaseDir().getName().startsWith(AcidUtils.BASE_PREFIX)) {
             readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions,
-              AcidUtils.parseBase(mergerOptions.getBaseDir()), mergerOptions.getBaseDir());
+                AcidUtils.ParsedBase.parseBase(mergerOptions.getBaseDir()).getWriteId(),
+                mergerOptions.getBaseDir());
           }
           pair = new OriginalReaderPairToCompact(baseKey, bucket, options, readerPairOptions,
             conf, validWriteIdList,
@@ -1223,7 +1224,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
         boolean isDelta = parent.getName().startsWith(AcidUtils.DELTA_PREFIX);
         if(isBase || isDelta) {
           if(isBase) {
-            return new TransactionMetaData(AcidUtils.parseBase(parent), parent);
+            return new TransactionMetaData(AcidUtils.ParsedBase.parseBase(parent).getWriteId(),
+                parent);
           }
           else {
             AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX,

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 374e973..46c51eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -403,13 +403,13 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     // Make sure we need locks.  It's possible there's nothing to lock in
     // this operation.
     if(plan.getInputs().isEmpty() && plan.getOutputs().isEmpty()) {
-      LOG.debug("No locks needed for queryId" + queryId);
+      LOG.debug("No locks needed for queryId=" + queryId);
       return null;
     }
     List<LockComponent> lockComponents = AcidUtils.makeLockComponents(plan.getOutputs(), plan.getInputs(), conf);
     //It's possible there's nothing to lock even if we have w/r entities.
     if(lockComponents.isEmpty()) {
-      LOG.debug("No locks needed for queryId" + queryId);
+      LOG.debug("No locks needed for queryId=" + queryId);
       return null;
     }
     rqstBuilder.addLockComponents(lockComponents);

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 7d901f9..3115e83 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -44,6 +44,8 @@ public class ReplicationSpec {
   private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in.
   private boolean isReplace = true; // default is that the import mode is insert overwrite
   private String validWriteIdList = null; // WriteIds snapshot for replicating ACID/MM tables.
+  //TxnIds snapshot
+  private String validTxnList = null;
   private Type specType = Type.DEFAULT; // DEFAULT means REPL_LOAD or BOOTSTRAP_DUMP or EXPORT
 
   // Key definitions related to replication
@@ -54,7 +56,8 @@ public class ReplicationSpec {
     NOOP("repl.noop"),
     LAZY("repl.lazy"),
     IS_REPLACE("repl.is.replace"),
-    VALID_WRITEID_LIST("repl.valid.writeid.list")
+    VALID_WRITEID_LIST("repl.valid.writeid.list"),
+    VALID_TXN_LIST("repl.valid.txnid.list")
     ;
     private final String keyName;
 
@@ -143,6 +146,7 @@ public class ReplicationSpec {
     this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString()));
     this.isReplace = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_REPLACE.toString()));
     this.validWriteIdList = keyFetcher.apply(ReplicationSpec.KEY.VALID_WRITEID_LIST.toString());
+    this.validTxnList = keyFetcher.apply(KEY.VALID_TXN_LIST.toString());
   }
 
   /**
@@ -342,6 +346,15 @@ public class ReplicationSpec {
     this.validWriteIdList = validWriteIdList;
   }
 
+  public String getValidTxnList() {
+    return validTxnList;
+  }
+
+  public void setValidTxnList(String validTxnList) {
+    this.validTxnList = validTxnList;
+  }
+
+
   /**
    * @return whether the current replication dumped object related to ACID/Mm table
    */
@@ -372,6 +385,8 @@ public class ReplicationSpec {
         return String.valueOf(isReplace());
       case VALID_WRITEID_LIST:
         return getValidWriteIdList();
+      case VALID_TXN_LIST:
+        return getValidTxnList();
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index 59ffb90..83a9642 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@@ -209,6 +210,7 @@ public class Utils {
           throws IOException {
     if (replicationSpec.isTransactionalTableDump()) {
       try {
+        conf.set(ValidTxnList.VALID_TXNS_KEY, replicationSpec.getValidTxnList());
         return AcidUtils.getValidDataPaths(fromPath, conf, replicationSpec.getValidWriteIdList());
       } catch (FileNotFoundException e) {
         throw new IOException(ErrorMsg.FILE_NOT_FOUND.format(e.getMessage()), e);

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 3bc1f8a..18253c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,10 +17,17 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
-import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileStatus;
@@ -31,9 +38,6 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -45,15 +49,8 @@ import org.apache.hadoop.util.StringUtils;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -68,9 +65,6 @@ public class Cleaner extends CompactorThread {
   private long cleanerCheckInterval = 0;
 
   private ReplChangeManager replChangeManager;
-  // List of compactions to clean.
-  private Map<Long, Set<Long>> compactId2LockMap = new HashMap<>();
-  private Map<Long, CompactionInfo> compactId2CompactInfoMap = new HashMap<>();
 
   @Override
   public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException {
@@ -97,95 +91,9 @@ public class Cleaner extends CompactorThread {
       try {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
-        // First look for all the compactions that are waiting to be cleaned.  If we have not
-        // seen an entry before, look for all the locks held on that table or partition and
-        // record them.  We will then only clean the partition once all of those locks have been
-        // released.  This way we avoid removing the files while they are in use,
-        // while at the same time avoiding starving the cleaner as new readers come along.
-        // This works because we know that any reader who comes along after the worker thread has
-        // done the compaction will read the more up to date version of the data (either in a
-        // newer delta or in a newer base).
-        List<CompactionInfo> toClean = txnHandler.findReadyToClean();
-        {
-          /**
-           * Since there may be more than 1 instance of Cleaner running we may have state info
-           * for items which were cleaned by instances.  Here we remove them.
-           *
-           * In the long run if we add end_time to compaction_queue, then we can check that
-           * hive_locks.acquired_at > compaction_queue.end_time + safety_buffer in which case
-           * we know the lock owner is reading files created by this compaction or later.
-           * The advantage is that we don't have to store the locks.
-           */
-          Set<Long> currentToCleanSet = new HashSet<>();
-          for (CompactionInfo ci : toClean) {
-            currentToCleanSet.add(ci.id);
-          }
-          Set<Long> cleanPerformedByOthers = new HashSet<>();
-          for (long id : compactId2CompactInfoMap.keySet()) {
-            if (!currentToCleanSet.contains(id)) {
-              cleanPerformedByOthers.add(id);
-            }
-          }
-          for (long id : cleanPerformedByOthers) {
-            compactId2CompactInfoMap.remove(id);
-            compactId2LockMap.remove(id);
-          }
-        }
-        if (toClean.size() > 0 || compactId2LockMap.size() > 0) {
-          ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest());
-          if(LOG.isDebugEnabled()) {
-            dumpLockState(locksResponse);
-          }
-          for (CompactionInfo ci : toClean) {
-            // Check to see if we have seen this request before.  If so, ignore it.  If not,
-            // add it to our queue.
-            if (!compactId2LockMap.containsKey(ci.id)) {
-              compactId2LockMap.put(ci.id, findRelatedLocks(ci, locksResponse));
-              compactId2CompactInfoMap.put(ci.id, ci);
-            }
-          }
-
-          // Now, for each entry in the queue, see if all of the associated locks are clear so we
-          // can clean
-          Set<Long> currentLocks = buildCurrentLockSet(locksResponse);
-          List<Long> expiredLocks = new ArrayList<Long>();
-          List<Long> compactionsCleaned = new ArrayList<Long>();
-          try {
-            for (Map.Entry<Long, Set<Long>> queueEntry : compactId2LockMap.entrySet()) {
-              boolean sawLock = false;
-              for (Long lockId : queueEntry.getValue()) {
-                if (currentLocks.contains(lockId)) {
-                  sawLock = true;
-                  break;
-                } else {
-                  expiredLocks.add(lockId);
-                }
-              }
-
-              if (!sawLock) {
-                // Remember to remove this when we're out of the loop,
-                // we can't do it in the loop or we'll get a concurrent modification exception.
-                compactionsCleaned.add(queueEntry.getKey());
-                //Future thought: this may be expensive so consider having a thread pool run in parallel
-                clean(compactId2CompactInfoMap.get(queueEntry.getKey()));
-              } else {
-                // Remove the locks we didn't see so we don't look for them again next time
-                for (Long lockId : expiredLocks) {
-                  queueEntry.getValue().remove(lockId);
-                }
-                LOG.info("Skipping cleaning of " +
-                    idWatermark(compactId2CompactInfoMap.get(queueEntry.getKey())) +
-                    " due to reader present: " + queueEntry.getValue());
-              }
-            }
-          } finally {
-            if (compactionsCleaned.size() > 0) {
-              for (Long compactId : compactionsCleaned) {
-                compactId2LockMap.remove(compactId);
-                compactId2CompactInfoMap.remove(compactId);
-              }
-            }
-          }
+        long minOpenTxnId = txnHandler.findMinOpenTxnId();
+        for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
+          clean(compactionInfo, minOpenTxnId);
         }
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
@@ -213,41 +121,7 @@ public class Cleaner extends CompactorThread {
     } while (!stop.get());
   }
 
-  private Set<Long> findRelatedLocks(CompactionInfo ci, ShowLocksResponse locksResponse) {
-    Set<Long> relatedLocks = new HashSet<Long>();
-    for (ShowLocksResponseElement lock : locksResponse.getLocks()) {
-      /**
-       * Hive QL is not case sensitive wrt db/table/column names
-       * Partition names get
-       * normalized (as far as I can tell) by lower casing column name but not partition value.
-       * {@link org.apache.hadoop.hive.metastore.Warehouse#makePartName(List, List, String)}
-       * {@link org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer#getPartSpec(ASTNode)}
-       * Since user input may start out in any case, compare here case-insensitive for db/table
-       * but leave partition name as is.
-       */
-      if (ci.dbname.equalsIgnoreCase(lock.getDbname())) {
-        if ((ci.tableName == null && lock.getTablename() == null) ||
-            (ci.tableName != null && ci.tableName.equalsIgnoreCase(lock.getTablename()))) {
-          if ((ci.partName == null && lock.getPartname() == null) ||
-              (ci.partName != null && ci.partName.equals(lock.getPartname()))) {
-            relatedLocks.add(lock.getLockid());
-          }
-        }
-      }
-    }
-
-    return relatedLocks;
-  }
-
-  private Set<Long> buildCurrentLockSet(ShowLocksResponse locksResponse) {
-    Set<Long> currentLocks = new HashSet<Long>(locksResponse.getLocks().size());
-    for (ShowLocksResponseElement lock : locksResponse.getLocks()) {
-      currentLocks.add(lock.getLockid());
-    }
-    return currentLocks;
-  }
-
-  private void clean(CompactionInfo ci) throws MetaException {
+  private void clean(CompactionInfo ci, long minOpenTxnGLB) throws MetaException {
     LOG.info("Starting cleaning for " + ci);
     try {
       Table t = resolveTable(ci);
@@ -271,27 +145,52 @@ public class Cleaner extends CompactorThread {
       }
       StorageDescriptor sd = resolveStorageDescriptor(t, p);
       final String location = sd.getLocation();
-
+      ValidTxnList validTxnList =
+          TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB);
+      //save it so that getAcidState() sees it
+      conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
       /**
-       * Each Compaction only compacts as far as the highest txn id such that all txns below it
-       * are resolved (i.e. not opened).  This is what "highestWriteId" tracks.  This is only tracked
-       * since Hive 1.3.0/2.0 - thus may be 0.  See ValidCompactorWriteIdList and uses for more info.
+       * {@code validTxnList} is capped by minOpenTxnGLB so if
+       * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta
+       * produced by a compactor, that means every reader that could be active right now see it
+       * as well.  That means if this base/delta shadows some earlier base/delta, the it will be
+       * used in favor of any files that it shadows.  Thus the shadowed files are safe to delete.
        *
-       * We only want to clean up to the highestWriteId - otherwise we risk deleting deltas from
-       * under an active reader.
        *
-       * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a 
-       * clean request for D2.  
-       * Cleaner checks existing locks and finds none.
-       * Between that check and removeFiles() a query starts (it will be reading D3) and another compaction
-       * completes which creates D4.
-       * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to be obsolete
-       * unless ValidWriteIdList is "capped" at highestWriteId.
+       * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted
+       * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
+       * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
+       * For example given partition P1, txnid:150 starts and sees txnid:149 as open.
+       * Say compactor runs in txnid:160, but 149 is still open and P1 has the largest resolved
+       * writeId:17.  Compactor will produce base_17_c160.
+       * Suppose txnid:149 writes delta_18_18
+       * to P1 and aborts.  Compactor can only remove TXN_COMPONENTS entries
+       * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and perhaps corrupted) but
+       * not visible based on 'validTxnList' capped at minOpenTxn so it will not not be cleaned by
+       * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so we must keep the
+       * metadata that says that 18 is aborted.
+       * In a slightly different case, whatever txn created delta_18 (and all other txn) may have
+       * committed by the time cleaner runs and so cleaner will indeed see delta_18_18 and remove
+       * it (since it has nothing but aborted data).  But we can't tell which actually happened
+       * in markCleaned() so make sure it doesn't delete meta above CG_CQ_HIGHEST_WRITE_ID.
+       *
+       * We could perhaps make cleaning of aborted and obsolete and remove all aborted files up
+       * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta can be removed
+       * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID.  This could be
+       * useful if there is all of a sudden a flood of aborted txns.  (For another day).
        */
-      final ValidWriteIdList validWriteIdList = (ci.highestWriteId > 0)
-          ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], new BitSet(),
-          ci.highestWriteId)
-          : new ValidReaderWriteIdList();
+      List<String> tblNames = Collections.singletonList(
+          TableName.getDbTable(t.getDbName(), t.getTableName()));
+      GetValidWriteIdsRequest rqst = new GetValidWriteIdsRequest(tblNames);
+      rqst.setValidTxnList(validTxnList.writeToString());
+      GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(rqst);
+      //we could have no write IDs for a table if it was never written to but
+      // since we are in the Cleaner phase of compactions, there must have
+      // been some delta/base dirs
+      assert rsp != null && rsp.getTblValidWriteIdsSize() == 1;
+      //Creating 'reader' list since we are interested in the set of 'obsolete' files
+      ValidReaderWriteIdList validWriteIdList =
+          TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0));
 
       if (runJobAsSelf(ci.runAs)) {
         removeFiles(location, validWriteIdList, ci);
@@ -328,7 +227,17 @@ public class Cleaner extends CompactorThread {
     Path locPath = new Path(location);
     AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList);
     List<FileStatus> obsoleteDirs = dir.getObsolete();
-    List<Path> filesToDelete = new ArrayList<Path>(obsoleteDirs.size());
+    /**
+     * add anything in 'dir'  that only has data from aborted transactions - no one should be
+     * trying to read anything in that dir (except getAcidState() that only reads the name of
+     * this dir itself)
+     * So this may run ahead of {@link CompactionInfo#highestWriteId} but it's ok (suppose there
+     * are no active txns when cleaner runs).  The key is to not delete metadata about aborted
+     * txns with write IDs > {@link CompactionInfo#highestWriteId}.
+     * See {@link TxnStore#markCleaned(CompactionInfo)}
+     */
+    obsoleteDirs.addAll(dir.getAbortedDirectories());
+    List<Path> filesToDelete = new ArrayList<>(obsoleteDirs.size());
     StringBuilder extraDebugInfo = new StringBuilder("[");
     for (FileStatus stat : obsoleteDirs) {
       filesToDelete.add(stat.getPath());
@@ -338,9 +247,6 @@ public class Cleaner extends CompactorThread {
       }
     }
     extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']');
-    List<Long> compactIds = new ArrayList<>(compactId2CompactInfoMap.keySet());
-    Collections.sort(compactIds);
-    extraDebugInfo.append("compactId2CompactInfoMap.keySet(").append(compactIds).append(")");
     LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() +
          " obsolete directories from " + location + ". " + extraDebugInfo.toString());
     if (filesToDelete.size() < 1) {
@@ -361,63 +267,4 @@ public class Cleaner extends CompactorThread {
       fs.delete(dead, true);
     }
   }
-  private static class LockComparator implements Comparator<ShowLocksResponseElement> {
-    //sort ascending by resource, nulls first
-    @Override
-    public int compare(ShowLocksResponseElement o1, ShowLocksResponseElement o2) {
-      if(o1 == o2) {
-        return 0;
-      }
-      if(o1 == null) {
-        return -1;
-      }
-      if(o2 == null) {
-        return 1;
-      }
-      int v = o1.getDbname().compareToIgnoreCase(o2.getDbname());
-      if(v != 0) {
-        return v;
-      }
-      if(o1.getTablename() == null) {
-        return -1;
-      }
-      if(o2.getTablename() == null) {
-        return 1;
-      }
-      v = o1.getTablename().compareToIgnoreCase(o2.getTablename());
-      if(v != 0) {
-        return v;
-      }
-      if(o1.getPartname() == null) {
-        return -1;
-      }
-      if(o2.getPartname() == null) {
-        return 1;
-      }
-      v = o1.getPartname().compareToIgnoreCase(o2.getPartname());
-      if(v != 0) {
-        return v;
-      }
-      //if still equal, compare by lock ids
-      v = Long.compare(o1.getLockid(), o2.getLockid());
-      if(v != 0) {
-        return v;
-      }
-      return Long.compare(o1.getLockIdInternal(), o2.getLockIdInternal());
-
-    }
-  }
-  private void dumpLockState(ShowLocksResponse slr) {
-    Iterator<ShowLocksResponseElement> l = slr.getLocksIterator();
-    List<ShowLocksResponseElement> sortedList = new ArrayList<>();
-    while(l.hasNext()) {
-      sortedList.add(l.next());
-    }
-    //sort for readability
-    sortedList.sort(new LockComparator());
-    LOG.info("dumping locks");
-    for(ShowLocksResponseElement lock : sortedList) {
-      LOG.info(lock.toString());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 92c74e1..c6cb7c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -42,14 +42,14 @@ import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.StringableMap;
 import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
-import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
@@ -59,10 +59,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.DriverUtils;
-import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.DDLTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
@@ -74,8 +71,6 @@ import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
@@ -100,6 +95,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.Ref;
+import org.apache.parquet.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -351,6 +347,7 @@ public class CompactorMR {
       // Set up the session for driver.
       conf = new HiveConf(conf);
       conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
+      conf.unset(ValidTxnList.VALID_TXNS_KEY);//so Driver doesn't get confused
 
       String user = UserGroupInformation.getCurrentUser().getShortUserName();
       SessionState sessionState = DriverUtils.setUpSessionState(conf, user, false);
@@ -599,7 +596,6 @@ public class CompactorMR {
    * to use.
    * @param job the job to update
    * @param cols the columns of the table
-   * @param map
    */
   private void setColumnTypes(JobConf job, List<FieldSchema> cols) {
     StringBuilder colNames = new StringBuilder();
@@ -1006,7 +1002,17 @@ public class CompactorMR {
         deleteEventWriter.close(false);
       }
     }
-
+    private long getCompactorTxnId() {
+      String snapshot = jobConf.get(ValidTxnList.VALID_TXNS_KEY);
+      if(Strings.isNullOrEmpty(snapshot)) {
+        throw new IllegalStateException(ValidTxnList.VALID_TXNS_KEY + " not found for writing to "
+            + jobConf.get(FINAL_LOCATION));
+      }
+      ValidTxnList validTxnList = new ValidReadTxnList();
+      validTxnList.readFromString(snapshot);
+      //this is id of the current txn
+      return validTxnList.getHighWatermark();
+    }
     private void getWriter(Reporter reporter, ObjectInspector inspector,
                            int bucket) throws IOException {
       if (writer == null) {
@@ -1019,7 +1025,8 @@ public class CompactorMR {
             .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
             .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
             .bucket(bucket)
-            .statementId(-1);//setting statementId == -1 makes compacted delta files use
+            .statementId(-1)//setting statementId == -1 makes compacted delta files use
+            .visibilityTxnId(getCompactorTxnId());
         //delta_xxxx_yyyy format
 
         // Instantiate the underlying output format
@@ -1044,8 +1051,9 @@ public class CompactorMR {
           .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
           .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
           .bucket(bucket)
-          .statementId(-1);//setting statementId == -1 makes compacted delta files use
-        //delta_xxxx_yyyy format
+          .statementId(-1)//setting statementId == -1 makes compacted delta files use
+            // delta_xxxx_yyyy format
+          .visibilityTxnId(getCompactorTxnId());
 
         // Instantiate the underlying output format
         @SuppressWarnings("unchecked")//since there is no way to parametrize instance of Class
@@ -1178,7 +1186,7 @@ public class CompactorMR {
       FileStatus[] contents = fs.listStatus(tmpLocation);//expect 1 base or delta dir in this list
       //we have MIN_TXN, MAX_TXN and IS_MAJOR in JobConf so we could figure out exactly what the dir
       //name is that we want to rename; leave it for another day
-      // TODO: if we expect one dir why don't we enforce it?
+      //todo: may actually have delta_x_y and delete_delta_x_y
       for (FileStatus fileStatus : contents) {
         //newPath is the base/delta dir
         Path newPath = new Path(finalLocation, fileStatus.getPath().getName());
@@ -1218,6 +1226,8 @@ public class CompactorMR {
       ValidWriteIdList actualWriteIds) throws IOException {
     Path fromPath = new Path(from), toPath = new Path(to);
     FileSystem fs = fromPath.getFileSystem(conf);
+    //todo: is that true?  can it be aborted? does it matter for compaction? probably OK since
+    //getAcidState() doesn't check if X is valid in base_X_cY for compacted base dirs.
     // Assume the high watermark can be used as maximum transaction ID.
     long maxTxn = actualWriteIds.getHighWatermark();
     AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index dd0929f..f5b901d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -51,7 +51,7 @@ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCa
 /**
  * Superclass for all threads in the compactor.
  */
-abstract class CompactorThread extends Thread implements MetaStoreThread {
+public abstract class CompactorThread extends Thread implements MetaStoreThread {
   static final private String CLASS_NAME = CompactorThread.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
@@ -67,8 +67,9 @@ abstract class CompactorThread extends Thread implements MetaStoreThread {
     // TODO MS-SPLIT for now, keep a copy of HiveConf around as we need to call other methods with
     // it. This should be changed to Configuration once everything that this calls that requires
     // HiveConf is moved to the standalone metastore.
-    conf = (configuration instanceof HiveConf) ? (HiveConf)configuration :
-        new HiveConf(configuration, HiveConf.class);
+    //clone the conf - compactor needs to set properties in it which we don't
+    // want to bleed into the caller
+    conf = new HiveConf(configuration, HiveConf.class);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/ddf3b6cd/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index beb6902..cdcc0e9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -143,13 +145,15 @@ public class Initiator extends CompactorThread {
                     ", assuming it has been dropped and moving on.");
                 continue;
               }
-
-              // Compaction doesn't work under a transaction and hence pass null for validTxnList
+              ValidTxnList validTxnList = TxnCommonUtils
+                  .createValidReadTxnList(txnHandler.getOpenTxns(), 0);
+              conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
               // The response will have one entry per table and hence we get only one ValidWriteIdList
               String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName());
               GetValidWriteIdsRequest rqst
                       = new GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
-              ValidWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList(
+              rqst.setValidTxnList(validTxnList.writeToString());
+              final ValidWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList(
                       txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0));
 
               StorageDescriptor sd = resolveStorageDescriptor(t, p);


Mime
View raw message