hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ekoif...@apache.org
Subject [2/2] hive git commit: HIVE-14350 Aborted txns cause false positive "Not enough history available..." msgs (Eugene Koifman, reviewed by Alan Gates)
Date Fri, 29 Jul 2016 22:36:24 GMT
HIVE-14350 Aborted txns cause false positive "Not enough history available..." msgs (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8a183766
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8a183766
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8a183766

Branch: refs/heads/master
Commit: 8a1837665ce1a1f129e1a923dd02866f8b7ffba6
Parents: 541fcb8
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Fri Jul 29 14:18:23 2016 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Fri Jul 29 14:19:02 2016 -0700

----------------------------------------------------------------------
 .../hive/common/ValidCompactorTxnList.java      | 109 +++++++------------
 .../hadoop/hive/common/ValidReadTxnList.java    |  40 ++++++-
 .../apache/hadoop/hive/common/ValidTxnList.java |   7 ++
 .../hive/common/TestValidReadTxnList.java       |  12 +-
 .../hive/ql/txn/compactor/TestCompactor.java    |   4 +
 metastore/if/hive_metastore.thrift              |   1 +
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp |  25 +++++
 .../gen/thrift/gen-cpp/hive_metastore_types.h   |  15 ++-
 .../hive/metastore/api/GetOpenTxnsResponse.java | 105 +++++++++++++++++-
 .../src/gen/thrift/gen-php/metastore/Types.php  |  23 ++++
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  |  15 ++-
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |   4 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  16 ++-
 .../hadoop/hive/metastore/txn/TxnUtils.java     |  28 +++--
 .../txn/TestValidCompactorTxnList.java          |  40 ++++---
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |   2 +-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  11 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |  16 +++
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java |  35 +++---
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java  |   8 +-
 20 files changed, 375 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
index ad79e2c..334b93e 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
@@ -18,94 +18,61 @@
 
 package org.apache.hadoop.hive.common;
 
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-
 import java.util.Arrays;
 
 /**
- * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
- * For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it
- * is committed or aborted.  Additionally it will return none if there are any open transactions
- * below the max transaction given, since we don't want to compact above open transactions.  For
- * {@link #isTxnValid} it will still view a transaction as valid only if it is committed.  These
- * produce the logic we need to assure that the compactor only sees records less than the lowest
+ * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
+ * 
+ * Compaction should only include txns up to smallest open txn (exclussive).
+ * There may be aborted txns in the snapshot represented by this ValidCompactorTxnList.
+ * Thus {@link #isTxnRangeValid(long, long)} returns NONE for any range that inluces any unresolved
+ * transactions.  Any txn above {@code highWatermark} is unresolved.
+ * These produce the logic we need to assure that the compactor only sees records less than the lowest
  * open transaction when choosing which files to compact, but that it still ignores aborted
  * records when compacting.
+ * 
+ * See {@link org.apache.hadoop.hive.metastore.txn.TxnUtils#createValidCompactTxnList()} for proper
+ * way to construct this.
  */
 public class ValidCompactorTxnList extends ValidReadTxnList {
-  //TODO: refactor this - minOpenTxn is not needed if we set
-  // highWatermark = Math.min(highWaterMark, minOpenTxn) (assuming there are open txns)
-
-  // The minimum open transaction id
-  private long minOpenTxn;
-
   public ValidCompactorTxnList() {
     super();
-    minOpenTxn = -1;
   }
-
   /**
-   *
-   * @param exceptions list of all open and aborted transactions
-   * @param minOpen lowest open transaction
-   * @param highWatermark highest committed transaction
+   * @param abortedTxnList list of all aborted transactions
+   * @param highWatermark highest committed transaction to be considered for compaction,
+   *                      equivalently (lowest_open_txn - 1).
    */
-  public ValidCompactorTxnList(long[] exceptions, long minOpen, long highWatermark) {
-    super(exceptions, highWatermark);
-    minOpenTxn = minOpen;
+  public ValidCompactorTxnList(long[] abortedTxnList, long highWatermark) {
+    super(abortedTxnList, highWatermark);
+    if(this.exceptions.length <= 0) {
+      return;
+    }
+    //now that exceptions (aka abortedTxnList) is sorted
+    int idx = Arrays.binarySearch(this.exceptions, highWatermark);
+    int lastElementPos;
+    if(idx < 0) {
+      int insertionPoint = -idx - 1 ;//see Arrays.binarySearch() JavaDoc
+      lastElementPos = insertionPoint - 1;
+    }
+    else {
+      lastElementPos = idx;
+    }
+    /**
+     * ensure that we throw out any exceptions above highWatermark to make
+     * {@link #isTxnValid(long)} faster 
+     */
+    this.exceptions = Arrays.copyOf(this.exceptions, lastElementPos + 1);
   }
-
   public ValidCompactorTxnList(String value) {
     super(value);
   }
-
+  /**
+   * Returns {@link org.apache.hadoop.hive.common.ValidTxnList.RangeResponse.ALL} if all txns in
+   * the range are resolved and RangeResponse.NONE otherwise
+   */
   @Override
   public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
-    if (highWatermark < minTxnId) {
-      return RangeResponse.NONE;
-    } else if (minOpenTxn < 0) {
-      return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
-    } else {
-      return minOpenTxn > maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
-    }
-  }
-
-  @Override
-  public String writeToString() {
-    StringBuilder buf = new StringBuilder();
-    buf.append(highWatermark);
-    buf.append(':');
-    buf.append(minOpenTxn);
-    if (exceptions.length == 0) {
-      buf.append(':');
-    } else {
-      for(long except: exceptions) {
-        buf.append(':');
-        buf.append(except);
-      }
-    }
-    return buf.toString();
-  }
-
-  @Override
-  public void readFromString(String src) {
-    if (src == null || src.length() == 0) {
-      highWatermark = Long.MAX_VALUE;
-      exceptions = new long[0];
-    } else {
-      String[] values = src.split(":");
-      highWatermark = Long.parseLong(values[0]);
-      minOpenTxn = Long.parseLong(values[1]);
-      exceptions = new long[values.length - 2];
-      for(int i = 2; i < values.length; ++i) {
-        exceptions[i-2] = Long.parseLong(values[i]);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  public long getMinOpenTxn() {
-    return minOpenTxn;
+    return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
index fda242d..2f35917 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
@@ -18,28 +18,43 @@
 
 package org.apache.hadoop.hive.common;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.util.Arrays;
 
 /**
- * An implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by readers.
+ * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by readers.
  * This class will view a transaction as valid only if it is committed.  Both open and aborted
  * transactions will be seen as invalid.
  */
 public class ValidReadTxnList implements ValidTxnList {
 
   protected long[] exceptions;
+  //default value means there are no open txn in the snapshot
+  private long minOpenTxn = Long.MAX_VALUE;
   protected long highWatermark;
 
   public ValidReadTxnList() {
-    this(new long[0], Long.MAX_VALUE);
+    this(new long[0], Long.MAX_VALUE, Long.MAX_VALUE);
   }
 
+  /**
+   * Used if there are no open transactions in the snapshot
+   */
   public ValidReadTxnList(long[] exceptions, long highWatermark) {
+    this(exceptions, highWatermark, Long.MAX_VALUE);
+  }
+  public ValidReadTxnList(long[] exceptions, long highWatermark, long minOpenTxn) {
     if (exceptions.length == 0) {
       this.exceptions = exceptions;
     } else {
       this.exceptions = exceptions.clone();
       Arrays.sort(this.exceptions);
+      this.minOpenTxn = minOpenTxn;
+      if(this.exceptions[0] <= 0) {
+        //should never happen of course
+        throw new IllegalArgumentException("Invalid txnid: " + this.exceptions[0] + " found");
+      }
     }
     this.highWatermark = highWatermark;
   }
@@ -56,6 +71,14 @@ public class ValidReadTxnList implements ValidTxnList {
     return Arrays.binarySearch(exceptions, txnid) < 0;
   }
 
+  /**
+   * We cannot use a base file if its range contains an open txn.
+   * @param txnid from base_xxxx
+   */
+  @Override
+  public boolean isValidBase(long txnid) {
+    return minOpenTxn > txnid && txnid <= highWatermark;
+  }
   @Override
   public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
     // check the easy cases first
@@ -92,6 +115,8 @@ public class ValidReadTxnList implements ValidTxnList {
   public String writeToString() {
     StringBuilder buf = new StringBuilder();
     buf.append(highWatermark);
+    buf.append(':');
+    buf.append(minOpenTxn);
     if (exceptions.length == 0) {
       buf.append(':');
     } else {
@@ -111,9 +136,10 @@ public class ValidReadTxnList implements ValidTxnList {
     } else {
       String[] values = src.split(":");
       highWatermark = Long.parseLong(values[0]);
-      exceptions = new long[values.length - 1];
-      for(int i = 1; i < values.length; ++i) {
-        exceptions[i-1] = Long.parseLong(values[i]);
+      minOpenTxn = Long.parseLong(values[1]);
+      exceptions = new long[values.length - 2];
+      for(int i = 2; i < values.length; ++i) {
+        exceptions[i-2] = Long.parseLong(values[i]);
       }
     }
   }
@@ -127,5 +153,9 @@ public class ValidReadTxnList implements ValidTxnList {
   public long[] getInvalidTransactions() {
     return exceptions;
   }
+  @VisibleForTesting
+  public long getMinOpenTxn() {
+    return minOpenTxn;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java b/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
index 87e7e30..5e1e4ee 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
@@ -47,6 +47,13 @@ public interface ValidTxnList {
   public boolean isTxnValid(long txnid);
 
   /**
+   * Returns {@code true} if such base file can be used to materialize the snapshot represented by
+   * this {@code ValidTxnList}.
+   * @param txnid highest txn in a given base_xxxx file
+   */
+  public boolean isValidBase(long txnid);
+
+  /**
    * Find out if a range of transaction ids are valid.  Note that valid may have different meanings
    * for different implementations, as some will only want to see committed transactions and some
    * both committed and aborted.

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java b/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
index d3c6803..6661158 100644
--- a/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
+++ b/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
@@ -34,9 +34,9 @@ public class TestValidReadTxnList {
 
   @Test
   public void noExceptions() throws Exception {
-    ValidTxnList txnList = new ValidReadTxnList(new long[0], 1);
+    ValidTxnList txnList = new ValidReadTxnList(new long[0], 1, Long.MAX_VALUE);
     String str = txnList.writeToString();
-    Assert.assertEquals("1:", str);
+    Assert.assertEquals("1:" + Long.MAX_VALUE + ":", str);
     ValidTxnList newList = new ValidReadTxnList();
     newList.readFromString(str);
     Assert.assertTrue(newList.isTxnValid(1));
@@ -45,9 +45,9 @@ public class TestValidReadTxnList {
 
   @Test
   public void exceptions() throws Exception {
-    ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, 5);
+    ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, 5, 4L);
     String str = txnList.writeToString();
-    Assert.assertEquals("5:2:4", str);
+    Assert.assertEquals("5:4:2:4", str);
     ValidTxnList newList = new ValidReadTxnList();
     newList.readFromString(str);
     Assert.assertTrue(newList.isTxnValid(1));
@@ -62,7 +62,7 @@ public class TestValidReadTxnList {
   public void longEnoughToCompress() throws Exception {
     long[] exceptions = new long[1000];
     for (int i = 0; i < 1000; i++) exceptions[i] = i + 100;
-    ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000);
+    ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000, 900);
     String str = txnList.writeToString();
     ValidTxnList newList = new ValidReadTxnList();
     newList.readFromString(str);
@@ -76,7 +76,7 @@ public class TestValidReadTxnList {
   public void readWriteConfig() throws Exception {
     long[] exceptions = new long[1000];
     for (int i = 0; i < 1000; i++) exceptions[i] = i + 100;
-    ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000);
+    ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000, 900);
     String str = txnList.writeToString();
     Configuration conf = new Configuration();
     conf.set(ValidTxnList.VALID_TXNS_KEY, str);

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index ca2a912..731caa8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -1038,6 +1038,10 @@ public class TestCompactor {
       public long[] getInvalidTransactions() {
         return new long[0];
       }
+      @Override
+      public boolean isValidBase(long txnid) {
+        return true;
+      }
     };
 
     OrcInputFormat aif = new OrcInputFormat();

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/if/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift
index 4d92b73..a2e35b8 100755
--- a/metastore/if/hive_metastore.thrift
+++ b/metastore/if/hive_metastore.thrift
@@ -635,6 +635,7 @@ struct GetOpenTxnsInfoResponse {
 struct GetOpenTxnsResponse {
     1: required i64 txn_high_water_mark,
     2: required set<i64> open_txns,
+    3: optional i64 min_open_txn, //since 1.3,2.2
 }
 
 struct OpenTxnRequest {

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index 79460a8..174b539 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -11911,6 +11911,11 @@ void GetOpenTxnsResponse::__set_open_txns(const std::set<int64_t> & val) {
   this->open_txns = val;
 }
 
+void GetOpenTxnsResponse::__set_min_open_txn(const int64_t val) {
+  this->min_open_txn = val;
+__isset.min_open_txn = true;
+}
+
 uint32_t GetOpenTxnsResponse::read(::apache::thrift::protocol::TProtocol* iprot) {
 
   apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
@@ -11963,6 +11968,14 @@ uint32_t GetOpenTxnsResponse::read(::apache::thrift::protocol::TProtocol* iprot)
           xfer += iprot->skip(ftype);
         }
         break;
+      case 3:
+        if (ftype == ::apache::thrift::protocol::T_I64) {
+          xfer += iprot->readI64(this->min_open_txn);
+          this->__isset.min_open_txn = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
       default:
         xfer += iprot->skip(ftype);
         break;
@@ -12000,6 +12013,11 @@ uint32_t GetOpenTxnsResponse::write(::apache::thrift::protocol::TProtocol* oprot
   }
   xfer += oprot->writeFieldEnd();
 
+  if (this->__isset.min_open_txn) {
+    xfer += oprot->writeFieldBegin("min_open_txn", ::apache::thrift::protocol::T_I64, 3);
+    xfer += oprot->writeI64(this->min_open_txn);
+    xfer += oprot->writeFieldEnd();
+  }
   xfer += oprot->writeFieldStop();
   xfer += oprot->writeStructEnd();
   return xfer;
@@ -12009,15 +12027,21 @@ void swap(GetOpenTxnsResponse &a, GetOpenTxnsResponse &b) {
   using ::std::swap;
   swap(a.txn_high_water_mark, b.txn_high_water_mark);
   swap(a.open_txns, b.open_txns);
+  swap(a.min_open_txn, b.min_open_txn);
+  swap(a.__isset, b.__isset);
 }
 
 GetOpenTxnsResponse::GetOpenTxnsResponse(const GetOpenTxnsResponse& other524) {
   txn_high_water_mark = other524.txn_high_water_mark;
   open_txns = other524.open_txns;
+  min_open_txn = other524.min_open_txn;
+  __isset = other524.__isset;
 }
 GetOpenTxnsResponse& GetOpenTxnsResponse::operator=(const GetOpenTxnsResponse& other525) {
   txn_high_water_mark = other525.txn_high_water_mark;
   open_txns = other525.open_txns;
+  min_open_txn = other525.min_open_txn;
+  __isset = other525.__isset;
   return *this;
 }
 void GetOpenTxnsResponse::printTo(std::ostream& out) const {
@@ -12025,6 +12049,7 @@ void GetOpenTxnsResponse::printTo(std::ostream& out) const {
   out << "GetOpenTxnsResponse(";
   out << "txn_high_water_mark=" << to_string(txn_high_water_mark);
   out << ", " << "open_txns=" << to_string(open_txns);
+  out << ", " << "min_open_txn="; (__isset.min_open_txn ? (out << to_string(min_open_txn)) : (out << "<null>"));
   out << ")";
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
index ec81798..bfec694 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -4881,29 +4881,42 @@ inline std::ostream& operator<<(std::ostream& out, const GetOpenTxnsInfoResponse
   return out;
 }
 
+typedef struct _GetOpenTxnsResponse__isset {
+  _GetOpenTxnsResponse__isset() : min_open_txn(false) {}
+  bool min_open_txn :1;
+} _GetOpenTxnsResponse__isset;
 
 class GetOpenTxnsResponse {
  public:
 
   GetOpenTxnsResponse(const GetOpenTxnsResponse&);
   GetOpenTxnsResponse& operator=(const GetOpenTxnsResponse&);
-  GetOpenTxnsResponse() : txn_high_water_mark(0) {
+  GetOpenTxnsResponse() : txn_high_water_mark(0), min_open_txn(0) {
   }
 
   virtual ~GetOpenTxnsResponse() throw();
   int64_t txn_high_water_mark;
   std::set<int64_t>  open_txns;
+  int64_t min_open_txn;
+
+  _GetOpenTxnsResponse__isset __isset;
 
   void __set_txn_high_water_mark(const int64_t val);
 
   void __set_open_txns(const std::set<int64_t> & val);
 
+  void __set_min_open_txn(const int64_t val);
+
   bool operator == (const GetOpenTxnsResponse & rhs) const
   {
     if (!(txn_high_water_mark == rhs.txn_high_water_mark))
       return false;
     if (!(open_txns == rhs.open_txns))
       return false;
+    if (__isset.min_open_txn != rhs.__isset.min_open_txn)
+      return false;
+    else if (__isset.min_open_txn && !(min_open_txn == rhs.min_open_txn))
+      return false;
     return true;
   }
   bool operator != (const GetOpenTxnsResponse &rhs) const {

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java
index 6986fc2..8230d38 100644
--- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java
+++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java
@@ -40,6 +40,7 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
 
   private static final org.apache.thrift.protocol.TField TXN_HIGH_WATER_MARK_FIELD_DESC = new org.apache.thrift.protocol.TField("txn_high_water_mark", org.apache.thrift.protocol.TType.I64, (short)1);
   private static final org.apache.thrift.protocol.TField OPEN_TXNS_FIELD_DESC = new org.apache.thrift.protocol.TField("open_txns", org.apache.thrift.protocol.TType.SET, (short)2);
+  private static final org.apache.thrift.protocol.TField MIN_OPEN_TXN_FIELD_DESC = new org.apache.thrift.protocol.TField("min_open_txn", org.apache.thrift.protocol.TType.I64, (short)3);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -49,11 +50,13 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
 
   private long txn_high_water_mark; // required
   private Set<Long> open_txns; // required
+  private long min_open_txn; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
     TXN_HIGH_WATER_MARK((short)1, "txn_high_water_mark"),
-    OPEN_TXNS((short)2, "open_txns");
+    OPEN_TXNS((short)2, "open_txns"),
+    MIN_OPEN_TXN((short)3, "min_open_txn");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -72,6 +75,8 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
           return TXN_HIGH_WATER_MARK;
         case 2: // OPEN_TXNS
           return OPEN_TXNS;
+        case 3: // MIN_OPEN_TXN
+          return MIN_OPEN_TXN;
         default:
           return null;
       }
@@ -113,7 +118,9 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
 
   // isset id assignments
   private static final int __TXN_HIGH_WATER_MARK_ISSET_ID = 0;
+  private static final int __MIN_OPEN_TXN_ISSET_ID = 1;
   private byte __isset_bitfield = 0;
+  private static final _Fields optionals[] = {_Fields.MIN_OPEN_TXN};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -122,6 +129,8 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
     tmpMap.put(_Fields.OPEN_TXNS, new org.apache.thrift.meta_data.FieldMetaData("open_txns", org.apache.thrift.TFieldRequirementType.REQUIRED, 
         new org.apache.thrift.meta_data.SetMetaData(org.apache.thrift.protocol.TType.SET, 
             new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))));
+    tmpMap.put(_Fields.MIN_OPEN_TXN, new org.apache.thrift.meta_data.FieldMetaData("min_open_txn", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(GetOpenTxnsResponse.class, metaDataMap);
   }
@@ -149,6 +158,7 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
       Set<Long> __this__open_txns = new HashSet<Long>(other.open_txns);
       this.open_txns = __this__open_txns;
     }
+    this.min_open_txn = other.min_open_txn;
   }
 
   public GetOpenTxnsResponse deepCopy() {
@@ -160,6 +170,8 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
     setTxn_high_water_markIsSet(false);
     this.txn_high_water_mark = 0;
     this.open_txns = null;
+    setMin_open_txnIsSet(false);
+    this.min_open_txn = 0;
   }
 
   public long getTxn_high_water_mark() {
@@ -222,6 +234,28 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
     }
   }
 
+  public long getMin_open_txn() {
+    return this.min_open_txn;
+  }
+
+  public void setMin_open_txn(long min_open_txn) {
+    this.min_open_txn = min_open_txn;
+    setMin_open_txnIsSet(true);
+  }
+
+  public void unsetMin_open_txn() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __MIN_OPEN_TXN_ISSET_ID);
+  }
+
+  /** Returns true if field min_open_txn is set (has been assigned a value) and false otherwise */
+  public boolean isSetMin_open_txn() {
+    return EncodingUtils.testBit(__isset_bitfield, __MIN_OPEN_TXN_ISSET_ID);
+  }
+
+  public void setMin_open_txnIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __MIN_OPEN_TXN_ISSET_ID, value);
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case TXN_HIGH_WATER_MARK:
@@ -240,6 +274,14 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
       }
       break;
 
+    case MIN_OPEN_TXN:
+      if (value == null) {
+        unsetMin_open_txn();
+      } else {
+        setMin_open_txn((Long)value);
+      }
+      break;
+
     }
   }
 
@@ -251,6 +293,9 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
     case OPEN_TXNS:
       return getOpen_txns();
 
+    case MIN_OPEN_TXN:
+      return getMin_open_txn();
+
     }
     throw new IllegalStateException();
   }
@@ -266,6 +311,8 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
       return isSetTxn_high_water_mark();
     case OPEN_TXNS:
       return isSetOpen_txns();
+    case MIN_OPEN_TXN:
+      return isSetMin_open_txn();
     }
     throw new IllegalStateException();
   }
@@ -301,6 +348,15 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
         return false;
     }
 
+    boolean this_present_min_open_txn = true && this.isSetMin_open_txn();
+    boolean that_present_min_open_txn = true && that.isSetMin_open_txn();
+    if (this_present_min_open_txn || that_present_min_open_txn) {
+      if (!(this_present_min_open_txn && that_present_min_open_txn))
+        return false;
+      if (this.min_open_txn != that.min_open_txn)
+        return false;
+    }
+
     return true;
   }
 
@@ -318,6 +374,11 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
     if (present_open_txns)
       list.add(open_txns);
 
+    boolean present_min_open_txn = true && (isSetMin_open_txn());
+    list.add(present_min_open_txn);
+    if (present_min_open_txn)
+      list.add(min_open_txn);
+
     return list.hashCode();
   }
 
@@ -349,6 +410,16 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetMin_open_txn()).compareTo(other.isSetMin_open_txn());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetMin_open_txn()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.min_open_txn, other.min_open_txn);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -380,6 +451,12 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
       sb.append(this.open_txns);
     }
     first = false;
+    if (isSetMin_open_txn()) {
+      if (!first) sb.append(", ");
+      sb.append("min_open_txn:");
+      sb.append(this.min_open_txn);
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -459,6 +536,14 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 3: // MIN_OPEN_TXN
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.min_open_txn = iprot.readI64();
+              struct.setMin_open_txnIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -487,6 +572,11 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
         }
         oprot.writeFieldEnd();
       }
+      if (struct.isSetMin_open_txn()) {
+        oprot.writeFieldBegin(MIN_OPEN_TXN_FIELD_DESC);
+        oprot.writeI64(struct.min_open_txn);
+        oprot.writeFieldEnd();
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -512,6 +602,14 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
           oprot.writeI64(_iter472);
         }
       }
+      BitSet optionals = new BitSet();
+      if (struct.isSetMin_open_txn()) {
+        optionals.set(0);
+      }
+      oprot.writeBitSet(optionals, 1);
+      if (struct.isSetMin_open_txn()) {
+        oprot.writeI64(struct.min_open_txn);
+      }
     }
 
     @Override
@@ -530,6 +628,11 @@ public class GetOpenTxnsResponse implements org.apache.thrift.TBase<GetOpenTxnsR
         }
       }
       struct.setOpen_txnsIsSet(true);
+      BitSet incoming = iprot.readBitSet(1);
+      if (incoming.get(0)) {
+        struct.min_open_txn = iprot.readI64();
+        struct.setMin_open_txnIsSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/gen/thrift/gen-php/metastore/Types.php
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-php/metastore/Types.php b/metastore/src/gen/thrift/gen-php/metastore/Types.php
index f505208..d6f7f49 100644
--- a/metastore/src/gen/thrift/gen-php/metastore/Types.php
+++ b/metastore/src/gen/thrift/gen-php/metastore/Types.php
@@ -12002,6 +12002,10 @@ class GetOpenTxnsResponse {
    * @var int[]
    */
   public $open_txns = null;
+  /**
+   * @var int
+   */
+  public $min_open_txn = null;
 
   public function __construct($vals=null) {
     if (!isset(self::$_TSPEC)) {
@@ -12018,6 +12022,10 @@ class GetOpenTxnsResponse {
             'type' => TType::I64,
             ),
           ),
+        3 => array(
+          'var' => 'min_open_txn',
+          'type' => TType::I64,
+          ),
         );
     }
     if (is_array($vals)) {
@@ -12027,6 +12035,9 @@ class GetOpenTxnsResponse {
       if (isset($vals['open_txns'])) {
         $this->open_txns = $vals['open_txns'];
       }
+      if (isset($vals['min_open_txn'])) {
+        $this->min_open_txn = $vals['min_open_txn'];
+      }
     }
   }
 
@@ -12077,6 +12088,13 @@ class GetOpenTxnsResponse {
             $xfer += $input->skip($ftype);
           }
           break;
+        case 3:
+          if ($ftype == TType::I64) {
+            $xfer += $input->readI64($this->min_open_txn);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
         default:
           $xfer += $input->skip($ftype);
           break;
@@ -12116,6 +12134,11 @@ class GetOpenTxnsResponse {
       }
       $xfer += $output->writeFieldEnd();
     }
+    if ($this->min_open_txn !== null) {
+      $xfer += $output->writeFieldBegin('min_open_txn', TType::I64, 3);
+      $xfer += $output->writeI64($this->min_open_txn);
+      $xfer += $output->writeFieldEnd();
+    }
     $xfer += $output->writeFieldStop();
     $xfer += $output->writeStructEnd();
     return $xfer;

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 8d88cd7..2d308c9 100644
--- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -8264,17 +8264,20 @@ class GetOpenTxnsResponse:
   Attributes:
    - txn_high_water_mark
    - open_txns
+   - min_open_txn
   """
 
   thrift_spec = (
     None, # 0
     (1, TType.I64, 'txn_high_water_mark', None, None, ), # 1
     (2, TType.SET, 'open_txns', (TType.I64,None), None, ), # 2
+    (3, TType.I64, 'min_open_txn', None, None, ), # 3
   )
 
-  def __init__(self, txn_high_water_mark=None, open_txns=None,):
+  def __init__(self, txn_high_water_mark=None, open_txns=None, min_open_txn=None,):
     self.txn_high_water_mark = txn_high_water_mark
     self.open_txns = open_txns
+    self.min_open_txn = min_open_txn
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -8300,6 +8303,11 @@ class GetOpenTxnsResponse:
           iprot.readSetEnd()
         else:
           iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.I64:
+          self.min_open_txn = iprot.readI64()
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -8321,6 +8329,10 @@ class GetOpenTxnsResponse:
         oprot.writeI64(iter419)
       oprot.writeSetEnd()
       oprot.writeFieldEnd()
+    if self.min_open_txn is not None:
+      oprot.writeFieldBegin('min_open_txn', TType.I64, 3)
+      oprot.writeI64(self.min_open_txn)
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -8336,6 +8348,7 @@ class GetOpenTxnsResponse:
     value = 17
     value = (value * 31) ^ hash(self.txn_high_water_mark)
     value = (value * 31) ^ hash(self.open_txns)
+    value = (value * 31) ^ hash(self.min_open_txn)
     return value
 
   def __repr__(self):

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 0964cd8..bd94e98 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -1843,10 +1843,12 @@ class GetOpenTxnsResponse
   include ::Thrift::Struct, ::Thrift::Struct_Union
   TXN_HIGH_WATER_MARK = 1
   OPEN_TXNS = 2
+  MIN_OPEN_TXN = 3
 
   FIELDS = {
     TXN_HIGH_WATER_MARK => {:type => ::Thrift::Types::I64, :name => 'txn_high_water_mark'},
-    OPEN_TXNS => {:type => ::Thrift::Types::SET, :name => 'open_txns', :element => {:type => ::Thrift::Types::I64}}
+    OPEN_TXNS => {:type => ::Thrift::Types::SET, :name => 'open_txns', :element => {:type => ::Thrift::Types::I64}},
+    MIN_OPEN_TXN => {:type => ::Thrift::Types::I64, :name => 'min_open_txn', :optional => true}
   }
 
   def struct_fields; FIELDS; end

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index b121644..e8c5fac 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -361,15 +361,25 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         close(rs);
         Set<Long> openList = new HashSet<Long>();
         //need the WHERE clause below to ensure consistent results with READ_COMMITTED
-        s = "select txn_id from TXNS where txn_id <= " + hwm;
+        s = "select txn_id, txn_state from TXNS where txn_id <= " + hwm;
         LOG.debug("Going to execute query<" + s + ">");
         rs = stmt.executeQuery(s);
+        long minOpenTxn = Long.MAX_VALUE;
         while (rs.next()) {
-          openList.add(rs.getLong(1));
+          long txnId = rs.getLong(1);
+          openList.add(txnId);
+          char c = rs.getString(2).charAt(0);
+          if(c == TXN_OPEN) {
+            minOpenTxn = Math.min(minOpenTxn, txnId);
+          }
         }
         LOG.debug("Going to rollback");
         dbConn.rollback();
-        return new GetOpenTxnsResponse(hwm, openList);
+        GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList);
+        if(minOpenTxn < Long.MAX_VALUE) {
+          otr.setMin_open_txn(minOpenTxn);
+        }
+        return otr;
       } catch (SQLException e) {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 39b18ac..2ffa1da 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -57,31 +58,42 @@ public class TxnUtils {
       if (currentTxn > 0 && currentTxn == txn) continue;
       exceptions[i++] = txn;
     }
-    return new ValidReadTxnList(exceptions, highWater);
+    if(txns.isSetMin_open_txn()) {
+      return new ValidReadTxnList(exceptions, highWater, txns.getMin_open_txn());
+    }
+    else {
+      return new ValidReadTxnList(exceptions, highWater);
+    }
   }
 
   /**
    * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
    * {@link org.apache.hadoop.hive.common.ValidTxnList}.  This assumes that the caller intends to
    * compact the files, and thus treats only open transactions as invalid.  Additionally any
-   * txnId > highestOpenTxnId is also invalid.  This is avoid creating something like
+   * txnId > highestOpenTxnId is also invalid.  This is to avoid creating something like
    * delta_17_120 where txnId 80, for example, is still open.
    * @param txns txn list from the metastore
    * @return a valid txn list.
    */
   public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
-    //todo: this could be more efficient: using select min(txn_id) from TXNS where txn_state=" +
-    // quoteChar(TXN_OPEN)  to compute compute HWM...
     long highWater = txns.getTxn_high_water_mark();
     long minOpenTxn = Long.MAX_VALUE;
     long[] exceptions = new long[txns.getOpen_txnsSize()];
     int i = 0;
     for (TxnInfo txn : txns.getOpen_txns()) {
-      if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
-      exceptions[i++] = txn.getId();//todo: only add Aborted
-    }//remove all exceptions < minOpenTxn
+      if (txn.getState() == TxnState.OPEN) {
+        minOpenTxn = Math.min(minOpenTxn, txn.getId());
+      }
+      else {
+        //only need aborted since we don't consider anything above minOpenTxn
+        exceptions[i++] = txn.getId();
+      }
+    }
+    if(i < exceptions.length) {
+      exceptions = Arrays.copyOf(exceptions, i);
+    }
     highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
-    return new ValidCompactorTxnList(exceptions, -1, highWater);
+    return new ValidCompactorTxnList(exceptions, highWater);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
index c249854..79ccc6b 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
@@ -26,66 +26,74 @@ public class TestValidCompactorTxnList {
 
   @Test
   public void minTxnHigh() {
-    ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 4}, 3, 5);
+    ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 4}, 2);
     ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
     Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
   }
 
   @Test
   public void maxTxnLow() {
-    ValidTxnList txns = new ValidCompactorTxnList(new long[]{13, 14}, 13, 15);
+    ValidTxnList txns = new ValidCompactorTxnList(new long[]{13, 14}, 12);
     ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
     Assert.assertEquals(ValidTxnList.RangeResponse.ALL, rsp);
   }
 
   @Test
   public void minTxnHighNoExceptions() {
-    ValidTxnList txns = new ValidCompactorTxnList(new long[0], -1, 5);
+    ValidTxnList txns = new ValidCompactorTxnList(new long[0], 5);
     ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
     Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
   }
 
   @Test
   public void maxTxnLowNoExceptions() {
-    ValidTxnList txns = new ValidCompactorTxnList(new long[0], -1, 15);
+    ValidTxnList txns = new ValidCompactorTxnList(new long[0], 15);
     ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
     Assert.assertEquals(ValidTxnList.RangeResponse.ALL, rsp);
   }
 
   @Test
   public void exceptionsAllBelow() {
-    ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 6}, 3, 15);
+    ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 6}, 3);
     ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
     Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
   }
 
   @Test
   public void exceptionsInMidst() {
-    ValidTxnList txns = new ValidCompactorTxnList(new long[]{8}, 8, 15);
+    ValidTxnList txns = new ValidCompactorTxnList(new long[]{8}, 7);
     ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
     Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
   }
+  @Test
+  public void exceptionsAbveHighWaterMark() {
+    ValidTxnList txns = new ValidCompactorTxnList(new long[]{8, 11, 17, 29}, 15);
+    Assert.assertArrayEquals("", new long[]{8, 11}, txns.getInvalidTransactions());
+    ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
+    Assert.assertEquals(ValidTxnList.RangeResponse.ALL, rsp);
+    rsp = txns.isTxnRangeValid(12, 16);
+    Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
+  }
 
   @Test
   public void writeToString() {
-    ValidTxnList txns = new ValidCompactorTxnList(new long[]{9, 7, 10}, 9, 37);
-    Assert.assertEquals("37:9:7:9:10", txns.writeToString());
+    ValidTxnList txns = new ValidCompactorTxnList(new long[]{9, 7, 10, Long.MAX_VALUE}, 8);
+    Assert.assertEquals("8:" + Long.MAX_VALUE + ":7", txns.writeToString());
     txns = new ValidCompactorTxnList();
-    Assert.assertEquals(Long.toString(Long.MAX_VALUE) + ":-1:", txns.writeToString());
-    txns = new ValidCompactorTxnList(new long[0], -1, 23);
-    Assert.assertEquals("23:-1:", txns.writeToString());
+    Assert.assertEquals(Long.toString(Long.MAX_VALUE) + ":" + Long.MAX_VALUE + ":", txns.writeToString());
+    txns = new ValidCompactorTxnList(new long[0], 23);
+    Assert.assertEquals("23:" + Long.MAX_VALUE + ":", txns.writeToString());
   }
 
   @Test
   public void readFromString() {
-    ValidCompactorTxnList txns = new ValidCompactorTxnList("37:9:7:9:10");
+    ValidCompactorTxnList txns = new ValidCompactorTxnList("37:" + Long.MAX_VALUE + ":7:9:10");
     Assert.assertEquals(37L, txns.getHighWatermark());
-    Assert.assertEquals(9L, txns.getMinOpenTxn());
+    Assert.assertEquals(Long.MAX_VALUE, txns.getMinOpenTxn());
     Assert.assertArrayEquals(new long[]{7L, 9L, 10L}, txns.getInvalidTransactions());
-    txns = new ValidCompactorTxnList("21:-1:");
+    txns = new ValidCompactorTxnList("21:" + Long.MAX_VALUE + ":");
     Assert.assertEquals(21L, txns.getHighWatermark());
-    Assert.assertEquals(-1L, txns.getMinOpenTxn());
+    Assert.assertEquals(Long.MAX_VALUE, txns.getMinOpenTxn());
     Assert.assertEquals(0, txns.getInvalidTransactions().length);
-
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index c150ec5..449d889 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -634,7 +634,7 @@ public class AcidUtils {
       //By definition there are no open txns with id < 1.
       return true;
     }
-    return ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeValid(1, baseTxnId);
+    return txnList.isValidBase(baseTxnId);
   }
   private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
       ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories,

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 0a2c3fa..20e5465 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -604,9 +604,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           footerCache = useExternalCache ? metaCache : localCache;
         }
       }
-      String value = conf.get(ValidTxnList.VALID_TXNS_KEY,
-                              Long.MAX_VALUE + ":");
-      transactionList = new ValidReadTxnList(value);
+      String value = conf.get(ValidTxnList.VALID_TXNS_KEY);
+      transactionList = value == null ? new ValidReadTxnList() : new ValidReadTxnList(value);
     }
 
     @VisibleForTesting
@@ -1806,9 +1805,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       bucket = (int) split.getStart();
       reader = null;
     }
-    String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY,
-                                Long.MAX_VALUE + ":");
-    ValidTxnList validTxnList = new ValidReadTxnList(txnString);
+    String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+    ValidTxnList validTxnList = txnString == null ? new ValidReadTxnList() :
+      new ValidReadTxnList(txnString);
     final OrcRawRecordMerger records =
         new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket,
             validTxnList, readOptions, deltas);

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index e796250..af192fb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -1206,6 +1206,22 @@ public class TestTxnCommands2 {
   }
 
   /**
+   * make sure Aborted txns don't red-flag a base_xxxx (HIVE-14350)
+   */
+  @Test
+  public void testNoHistory() throws Exception {
+    int[][] tableData = {{1,2},{3,4}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+    
+    runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+    runWorker(hiveConf);
+    runCleaner(hiveConf);
+    runStatementOnDriver("select count(*) from " + Table.ACIDTBL);
+  }
+  /**
    * takes raw data and turns it into a string as if from Driver.getResults()
    * sorts rows in dictionary order
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index b83cea4..556df18 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -121,7 +121,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/subdir/000000_0", 0, new byte[0]));
     AcidUtils.Directory dir =
         AcidUtils.getAcidState(new MockPath(fs, "/tbl/part1"), conf,
-            new ValidReadTxnList("100:"));
+            new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
     assertEquals(null, dir.getBaseDirectory());
     assertEquals(0, dir.getCurrentDirectories().size());
     assertEquals(0, dir.getObsolete().size());
@@ -152,7 +152,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/delta_101_101/bucket_0", 0, new byte[0]));
     AcidUtils.Directory dir =
         AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs,
-            "mock:/tbl/part1"), conf, new ValidReadTxnList("100:"));
+            "mock:/tbl/part1"), conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
     assertEquals(null, dir.getBaseDirectory());
     List<FileStatus> obsolete = dir.getObsolete();
     assertEquals(2, obsolete.size());
@@ -194,7 +194,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/delta_90_120/bucket_0", 0, new byte[0]));
     AcidUtils.Directory dir =
         AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs,
-            "mock:/tbl/part1"), conf, new ValidReadTxnList("100:"));
+            "mock:/tbl/part1"), conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
     assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
     List<FileStatus> obsolete = dir.getObsolete();
     assertEquals(5, obsolete.size());
@@ -225,7 +225,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/base_200/bucket_0", 500, new byte[0]));
     Path part = new MockPath(fs, "/tbl/part1");
     AcidUtils.Directory dir =
-        AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:"));
+        AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:" + Long.MAX_VALUE + ":"));
     assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString());
     assertEquals(1, dir.getCurrentDirectories().size());
     assertEquals("mock:/tbl/part1/delta_120_130",
@@ -238,7 +238,7 @@ public class TestAcidUtils {
     assertEquals("mock:/tbl/part1/delta_98_100", obsoletes.get(3).getPath().toString());
     assertEquals(0, dir.getOriginalFiles().size());
 
-    dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:"));
+    dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:" + Long.MAX_VALUE + ":"));
     assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString());
     assertEquals(0, dir.getCurrentDirectories().size());
     obsoletes = dir.getObsolete();
@@ -250,8 +250,9 @@ public class TestAcidUtils {
     the existence of delta_120_130 implies that 121 in the exception list is aborted unless
     delta_120_130 is from streaming ingest in which case 121 can be open
     (and thus 122-130 are open too)
+    99 here would be Aborted since 121 is minOpenTxn, base_100 is still good
     For multi-statment txns, see HIVE-13369*/
-    dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:121"));
+    dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:121:99:121"));
     assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString());
     assertEquals(1, dir.getCurrentDirectories().size());
     assertEquals("mock:/tbl/part1/delta_120_130",
@@ -265,7 +266,7 @@ public class TestAcidUtils {
 
     boolean gotException = false;
     try {
-      dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("125:5"));
+      dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("125:5:5"));
     }
     catch(IOException e) {
       gotException = true;
@@ -282,7 +283,7 @@ public class TestAcidUtils {
     part = new MockPath(fs, "/tbl/part1");
     try {
       gotException = false;
-      dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7"));
+      dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7:7"));
     }
     catch(IOException e) {
       gotException = true;
@@ -298,7 +299,7 @@ public class TestAcidUtils {
     part = new MockPath(fs, "/tbl/part1");
     try {
       gotException = false;
-      dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7"));
+      dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7:7"));
     }
     catch(IOException e) {
       gotException = true;
@@ -314,7 +315,7 @@ public class TestAcidUtils {
       new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]));
     part = new MockPath(fs, "/tbl/part1");
     //note that we don't include current txn of the client in exception list to read-you-writes
-    dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("1:"));
+    dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("1:" + Long.MAX_VALUE + ":"));
     assertEquals("mock:/tbl/part1/base_" + Long.MIN_VALUE, dir.getBaseDirectory().toString());
     assertEquals(1, dir.getCurrentDirectories().size());
     assertEquals("mock:/tbl/part1/delta_1_1", dir.getCurrentDirectories().get(0).getPath().toString());
@@ -330,7 +331,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/000001_1", 500, new byte[0]));
     Path part = new MockPath(fs, "/tbl/part1");
     AcidUtils.Directory dir =
-        AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:"));
+        AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:" + Long.MAX_VALUE + ":"));
     // Obsolete list should include the two original bucket files, and the old base dir
     List<FileStatus> obsolete = dir.getObsolete();
     assertEquals(3, obsolete.size());
@@ -351,7 +352,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
     Path part = new MockPath(fs, "mock:/tbl/part1");
     AcidUtils.Directory dir =
-        AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:"));
+        AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
     assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
     List<FileStatus> obsolete = dir.getObsolete();
     assertEquals(2, obsolete.size());
@@ -386,7 +387,7 @@ public class TestAcidUtils {
       new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
     Path part = new MockPath(fs, "mock:/tbl/part1");
     AcidUtils.Directory dir =
-      AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:"));
+      AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":"));
     assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
     List<FileStatus> obsolete = dir.getObsolete();
     assertEquals(5, obsolete.size());
@@ -411,7 +412,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
         new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]));
     Path part = new MockPath(fs, "mock:/tbl/part1");
-    AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4"));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4:4"));
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(2, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -432,7 +433,7 @@ public class TestAcidUtils {
       new MockFile("mock:/tbl/part1/delta_4_4_3/bucket_0", 500, new byte[0]),
       new MockFile("mock:/tbl/part1/delta_101_101_1/bucket_0", 500, new byte[0]));
     Path part = new MockPath(fs, "mock:/tbl/part1");
-    AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4"));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4:4"));
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(2, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -447,7 +448,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]));
     Path part = new MockPath(fs, "mock:/tbl/part1");
     AcidUtils.Directory dir =
-        AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("100:4"));
+        AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("4:" + Long.MAX_VALUE));
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(1, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -464,7 +465,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/delta_6_10/bucket_0", 500, new byte[0]));
     Path part = new MockPath(fs, "mock:/tbl/part1");
     AcidUtils.Directory dir =
-        AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("100:4"));
+        AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("3:" + Long.MAX_VALUE));
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(1, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());

http://git-wip-us.apache.org/repos/asf/hive/blob/8a183766/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index ddef4a2..f07aa49 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -311,7 +311,7 @@ public class TestOrcRawRecordMerger {
   }
 
   private static ValidTxnList createMaximalTxnList() {
-    return new ValidReadTxnList(Long.MAX_VALUE + ":");
+    return new ValidReadTxnList();
   }
 
   @Test
@@ -517,7 +517,7 @@ public class TestOrcRawRecordMerger {
         .maximumTransactionId(100).finalDestination(root);
     of.getRecordUpdater(root, options).close(false);
 
-    ValidTxnList txnList = new ValidReadTxnList("200:");
+    ValidTxnList txnList = new ValidReadTxnList("200:" + Long.MAX_VALUE);
     AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
 
     Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
@@ -585,7 +585,7 @@ public class TestOrcRawRecordMerger {
     ru.delete(200, new MyRow("", 8, 0, BUCKET));
     ru.close(false);
 
-    ValidTxnList txnList = new ValidReadTxnList("200:");
+    ValidTxnList txnList = new ValidReadTxnList("200:" + Long.MAX_VALUE);
     AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList);
 
     assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
@@ -775,7 +775,7 @@ public class TestOrcRawRecordMerger {
     merger.close();
 
     // try ignoring the 200 transaction and make sure it works still
-    ValidTxnList txns = new ValidReadTxnList("2000:200");
+    ValidTxnList txns = new ValidReadTxnList("2000:200:200");
     merger =
         new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
             txns, new Reader.Options(),


Mime
View raw message