hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ekoif...@apache.org
Subject [3/3] hive git commit: HIVE-11388 - Allow ACID Compactor components to run in multiple metastores (Eugene Koifman, reviewed by Wei Zheng)
Date Thu, 24 Mar 2016 23:28:29 GMT
HIVE-11388 - Allow ACID Compactor components to run in multiple metastores (Eugene Koifman,
reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/17870823
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/17870823
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/17870823

Branch: refs/heads/branch-1
Commit: 178708231e09bb2c08aa05cf9979efd6d3cd542c
Parents: c829505
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Thu Mar 24 16:22:21 2016 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Thu Mar 24 16:22:21 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/ServerUtils.java  |  14 ++
 .../deployers/config/hive/hive-site.mysql.xml   |  24 ++-
 .../hive/metastore/txn/CompactionInfo.java      |   4 +
 .../metastore/txn/CompactionTxnHandler.java     |   7 +-
 .../hadoop/hive/metastore/txn/TxnDbUtil.java    |  20 ++-
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 169 ++++++++++++++++++-
 .../hadoop/hive/metastore/txn/TxnStore.java     |  33 +++-
 .../hadoop/hive/metastore/txn/TxnUtils.java     |   4 +-
 .../metastore/txn/ValidCompactorTxnList.java    |   2 +-
 .../hive/metastore/txn/TestTxnHandler.java      |  93 ++++++++++
 .../ql/txn/AcidCompactionHistoryService.java    |   7 +
 .../hive/ql/txn/AcidHouseKeeperService.java     |   7 +
 .../hadoop/hive/ql/txn/compactor/Cleaner.java   |  62 ++++++-
 .../hadoop/hive/ql/txn/compactor/Initiator.java |  19 ++-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |   7 +-
 .../hive/ql/lockmgr/TestDbTxnManager.java       |   6 +
 16 files changed, 445 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
index a284f18..4141770 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
@@ -24,6 +24,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
 /**
  * ServerUtils (specific to HiveServer version 1)
  */
@@ -47,4 +50,15 @@ public class ServerUtils {
     }
   }
 
+  /**
+   * @return name of current host
+   */
+  public static String hostname() {
+    try {
+      return InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      LOG.error("Unable to resolve my host name " + e.getMessage());
+      throw new RuntimeException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
----------------------------------------------------------------------
diff --git a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
index b6f1ab7..387da6c 100644
--- a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
+++ b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
@@ -62,13 +62,14 @@
         <name>hive.exec.dynamic.partition.mode</name>
         <value>nonstrict</value>
     </property>
+
     <property>
         <name>hive.compactor.initiator.on</name>
-        <value>false</value>
+        <value>true</value>
     </property>
     <property>
         <name>hive.compactor.worker.threads</name>
-        <value>2</value>
+        <value>5</value>
     </property>
     <property>
         <name>hive.timedout.txn.reaper.start</name>
@@ -81,9 +82,24 @@
     -->
     <property>
         <name>hive.timedout.txn.reaper.interval</name>
-        <value>30s</value>
+        <value>1s</value>
+    </property>
+    <property>
+        <name>hive.compactor.history.reaper.interval</name>
+        <value>1s</value>
+    </property>
+    <property>
+        <name>hive.compactor.cleaner.run.interval</name>
+        <value>1s</value>
+    </property>
+    <property>
+        <name>hive.compactor.check.interval</name>
+        <value>1s</value>
+    </property>
+    <property>
+        <name>hive.compactor.delta.num.threshold</name>
+        <value>2</value>
     </property>
-
     <!--end ACID related properties-->
 <!--
     <property>

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index 73255d2..bea1473 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.metastore.txn;
 
 import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -39,6 +40,9 @@ public class CompactionInfo implements Comparable<CompactionInfo>
{
   public boolean tooManyAborts = false;
   /**
    * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will
return 0 if field is NULL) 
+   * See {@link TxnStore#setCompactionHighestTxnId(CompactionInfo, long)} for precise definition.
+   * See also {@link TxnUtils#createValidCompactTxnList(GetOpenTxnsInfoResponse)} and
+   * {@link ValidCompactorTxnList#highWatermark}
    */
   public long highestTxnId;
   byte[] metaInfo;

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index f7c738a..cdff357 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -161,6 +161,8 @@ class CompactionTxnHandler extends TxnHandler {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      //need a separate stmt for executeUpdate() otherwise it will close the ResultSet(HIVE-12725)
+      Statement updStmt = null;
       ResultSet rs = null;
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -174,6 +176,7 @@ class CompactionTxnHandler extends TxnHandler {
           dbConn.rollback();
           return null;
         }
+        updStmt = dbConn.createStatement();
         do {
           CompactionInfo info = new CompactionInfo();
           info.id = rs.getLong(1);
@@ -187,7 +190,7 @@ class CompactionTxnHandler extends TxnHandler {
             "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " +
info.id +
             " AND cq_state='" + INITIATED_STATE + "'";
           LOG.debug("Going to execute update <" + s + ">");
-          int updCount = stmt.executeUpdate(s);
+          int updCount = updStmt.executeUpdate(s);
           if(updCount == 1) {
             dbConn.commit();
             return info;
@@ -211,6 +214,7 @@ class CompactionTxnHandler extends TxnHandler {
         throw new MetaException("Unable to connect to transaction database " +
           StringUtils.stringifyException(e));
       } finally {
+        closeStmt(updStmt);
         close(rs, stmt, dbConn);
       }
     } catch (RetryException e) {
@@ -627,6 +631,7 @@ class CompactionTxnHandler extends TxnHandler {
 
   /**
    * Record the highest txn id that the {@code ci} compaction job will pay attention to.
+   * This is the highest resolved txn id, i.e. such that there are no open txns with lower
ids.
    */
   public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException
{
     Connection dbConn = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 42415ac..56c9ed8 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -68,7 +68,7 @@ public final class TxnDbUtil {
     Connection conn = null;
     Statement stmt = null;
     try {
-      conn = getConnection();
+      conn = getConnection(true);
       stmt = conn.createStatement();
       stmt.execute("CREATE TABLE TXNS (" +
           "  TXN_ID bigint PRIMARY KEY," +
@@ -140,8 +140,13 @@ public final class TxnDbUtil {
         " CC_HIGHEST_TXN_ID bigint," +
         " CC_META_INFO varchar(2048) for bit data," +
         " CC_HADOOP_JOB_ID varchar(32))");
-
-      conn.commit();
+      
+      stmt.execute("CREATE TABLE AUX_TABLE (" +
+        "  MT_KEY1 varchar(128) NOT NULL," +
+        "  MT_KEY2 bigint NOT NULL," +
+        "  MT_COMMENT varchar(255)," +
+        "  PRIMARY KEY(MT_KEY1, MT_KEY2)" +
+        ")");
     } catch (SQLException e) {
       try {
         conn.rollback();
@@ -166,7 +171,7 @@ public final class TxnDbUtil {
     Connection conn = null;
     Statement stmt = null;
     try {
-      conn = getConnection();
+      conn = getConnection(true);
       stmt = conn.createStatement();
 
       // We want to try these, whether they succeed or fail.
@@ -185,7 +190,7 @@ public final class TxnDbUtil {
       dropTable(stmt, "COMPACTION_QUEUE");
       dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID");
       dropTable(stmt, "COMPLETED_COMPACTIONS");
-      conn.commit();
+      dropTable(stmt, "AUX_TABLE");
     } finally {
       closeResources(conn, stmt, null);
     }
@@ -249,6 +254,9 @@ public final class TxnDbUtil {
   }
 
   static Connection getConnection() throws Exception {
+    return getConnection(false);
+  }
+  static Connection getConnection(boolean isAutoCommit) throws Exception {
     HiveConf conf = new HiveConf();
     String jdbcDriver = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER);
     Driver driver = (Driver) Class.forName(jdbcDriver).newInstance();
@@ -260,7 +268,7 @@ public final class TxnDbUtil {
     prop.setProperty("user", user);
     prop.setProperty("password", passwd);
     Connection conn = driver.connect(driverUrl, prop);
-    conn.setAutoCommit(false);
+    conn.setAutoCommit(isAutoCommit);
     return conn;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 9789371..a3b0751 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -23,6 +23,8 @@ import com.jolbox.bonecp.BoneCPDataSource;
 import org.apache.commons.dbcp.ConnectionFactory;
 import org.apache.commons.dbcp.DriverManagerConnectionFactory;
 import org.apache.commons.dbcp.PoolableConnectionFactory;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hive.common.ServerUtils;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.commons.logging.Log;
@@ -45,6 +47,8 @@ import javax.sql.DataSource;
 import java.io.IOException;
 import java.sql.*;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -87,7 +91,7 @@ import java.util.concurrent.locks.ReentrantLock;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-abstract class TxnHandler implements TxnStore {
+abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   static final protected char INITIATED_STATE = 'i';
   static final protected char WORKING_STATE = 'w';
@@ -139,6 +143,12 @@ abstract class TxnHandler implements TxnStore {
    * Derby specific concurrency control
    */
   private static final ReentrantLock derbyLock = new ReentrantLock(true);
+  /**
+   * must be static since even in UT there may be > 1 instance of TxnHandler
+   * (e.g. via Compactor services)
+   */
+  private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>();
+  private static final String hostname = ServerUtils.hostname();
 
   // Private methods should never catch SQLException and then throw MetaException.  The public
   // methods depend on SQLException coming back so they can detect and handle deadlocks.
 Private
@@ -563,7 +573,7 @@ abstract class TxnHandler implements TxnStore {
    * @throws MetaException
    */
   private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState)
throws SQLException, MetaException {
-    String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null
? "AND TXN_STATE=" + quoteChar(txnState) : "");
+    String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null
? " AND TXN_STATE=" + quoteChar(txnState) : "");
     ResultSet rs = stmt.executeQuery(addForUpdateClause(query));
     if(rs.next()) {
       return rs;
@@ -1437,14 +1447,14 @@ abstract class TxnHandler implements TxnStore {
     }
   }
 
-  void rollbackDBConn(Connection dbConn) {
+  static void rollbackDBConn(Connection dbConn) {
     try {
       if (dbConn != null && !dbConn.isClosed()) dbConn.rollback();
     } catch (SQLException e) {
       LOG.warn("Failed to rollback db connection " + getMessage(e));
     }
   }
-  protected void closeDbConn(Connection dbConn) {
+  protected static void closeDbConn(Connection dbConn) {
     try {
       if (dbConn != null && !dbConn.isClosed()) {
         dbConn.close();
@@ -1458,7 +1468,7 @@ abstract class TxnHandler implements TxnStore {
    * Close statement instance.
    * @param stmt statement instance.
    */
-  protected void closeStmt(Statement stmt) {
+  protected static void closeStmt(Statement stmt) {
     try {
       if (stmt != null && !stmt.isClosed()) stmt.close();
     } catch (SQLException e) {
@@ -1470,7 +1480,7 @@ abstract class TxnHandler implements TxnStore {
    * Close the ResultSet.
    * @param rs may be {@code null}
    */
-  void close(ResultSet rs) {
+  static void close(ResultSet rs) {
     try {
       if (rs != null && !rs.isClosed()) {
         rs.close();
@@ -1484,7 +1494,7 @@ abstract class TxnHandler implements TxnStore {
   /**
    * Close all 3 JDBC artifacts in order: {@code rs stmt dbConn}
    */
-  void close(ResultSet rs, Statement stmt, Connection dbConn) {
+  static void close(ResultSet rs, Statement stmt, Connection dbConn) {
     close(rs);
     closeStmt(stmt);
     closeDbConn(dbConn);
@@ -2635,6 +2645,40 @@ abstract class TxnHandler implements TxnStore {
     }
     return false;
   }
+  private boolean isDuplicateKeyError(SQLException ex) {
+    switch (dbProduct) {
+      case DERBY:
+        if("23505".equals(ex.getSQLState())) {
+          return true;
+        }
+        break;
+      case MYSQL:
+        if(ex.getErrorCode() == 1022 && "23000".equals(ex.getSQLState())) {
+          return true;
+        }
+        break;
+      case SQLSERVER:
+        //2627 is unique constaint violation incl PK, 2601 - unique key
+        if(ex.getErrorCode() == 2627 && "23000".equals(ex.getSQLState())) {
+          return true;
+        }
+        break;
+      case ORACLE:
+        if(ex.getErrorCode() == 1 && "23000".equals(ex.getSQLState())) {
+          return true;
+        }
+        break;
+      case POSTGRES:
+        //http://www.postgresql.org/docs/8.1/static/errcodes-appendix.html
+        if("23505".equals(ex.getSQLState())) {
+          return true;
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("Unexpected DB type: " + dbProduct + "; " + getMessage(ex));
+    }
+    return false;
+  }
   private static String getMessage(SQLException ex) {
     return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode()
+ ")";
   }
@@ -2709,4 +2753,115 @@ abstract class TxnHandler implements TxnStore {
       derbyLock.unlock();
     }
   }
+  @Override
+  public MutexAPI getMutexAPI() {
+    return this;
+  }
+
+  @Override
+  public LockHandle acquireLock(String key) throws MetaException {
+    /**
+     * The implementation here is a bit kludgey but done so that code exercised by unit tests
+     * (which run against Derby which has no support for select for update) is as similar
to
+     * production code as possible.
+     * In particular, with Derby we always run in a single process with a single metastore
and
+     * the absence of For Update is handled via a Semaphore.  The later would strictly speaking
+     * make the SQL statments below unnecessary (for Derby), but then they would not be tested.
+     */
+    Connection dbConn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      try {
+        String sqlStmt = addForUpdateClause("select MT_COMMENT from AUX_TABLE where MT_KEY1="
+ quoteString(key) + " and MT_KEY2=0");
+        lockInternal();
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("About to execute SQL: " + sqlStmt);
+        }
+        rs = stmt.executeQuery(sqlStmt);
+        if (!rs.next()) {
+          close(rs);
+          try {
+            stmt.executeUpdate("insert into AUX_TABLE(MT_KEY1,MT_KEY2) values(" + quoteString(key)
+ ", 0)");
+            dbConn.commit();
+          } catch (SQLException ex) {
+            if (!isDuplicateKeyError(ex)) {
+              throw new RuntimeException("Unable to lock " + quoteString(key) + " due to:
" + getMessage(ex), ex);
+            }
+          }
+          rs = stmt.executeQuery(sqlStmt);
+          if (!rs.next()) {
+            throw new IllegalStateException("Unable to lock " + quoteString(key) + ".  Expected
row in AUX_TABLE is missing.");
+          }
+        }
+        Semaphore derbySemaphore = null;
+        if(dbProduct == DatabaseProduct.DERBY) {
+          derbyKey2Lock.putIfAbsent(key, new Semaphore(1));
+          derbySemaphore =  derbyKey2Lock.get(key);
+          derbySemaphore.acquire();
+        }
+        LOG.info(quoteString(key) + " locked by " + quoteString(TxnHandler.hostname));
+        //OK, so now we have a lock
+        return new LockHandleImpl(dbConn, stmt, rs, key, derbySemaphore);
+      } catch (SQLException ex) {
+        rollbackDBConn(dbConn);
+        close(rs, stmt, dbConn);
+        checkRetryable(dbConn, ex, "acquireLock(" + key + ")");
+        throw new MetaException("Unable to lock " + quoteString(key) + " due to: " + getMessage(ex)
+ "; " + StringUtils.stringifyException(ex));
+      }
+      catch(InterruptedException ex) {
+        rollbackDBConn(dbConn);
+        close(rs, stmt, dbConn);
+        throw new MetaException("Unable to lock " + quoteString(key) + " due to: " + ex.getMessage()
+ StringUtils.stringifyException(ex));
+      }
+      finally {
+        unlockInternal();
+      }
+    }
+    catch(RetryException ex) {
+      acquireLock(key);
+    }
+    throw new MetaException("This can't happen because checkRetryable() has a retry limit");
+  }
+  public void acquireLock(String key, LockHandle handle) {
+    //the idea is that this will use LockHandle.dbConn
+    throw new NotImplementedException();
+  }
+  private static final class LockHandleImpl implements LockHandle {
+    private final Connection dbConn;
+    private final Statement stmt;
+    private final ResultSet rs;
+    private final Semaphore derbySemaphore;
+    private final List<String> keys = new ArrayList<>();
+    LockHandleImpl(Connection conn, Statement stmt, ResultSet rs, String key, Semaphore derbySemaphore)
{
+      this.dbConn = conn;
+      this.stmt = stmt;
+      this.rs = rs;
+      this.derbySemaphore = derbySemaphore;
+      if(derbySemaphore != null) {
+        //oterwise it may later release permit acquired by someone else
+        assert derbySemaphore.availablePermits() == 0 : "Expected locked Semaphore";
+      }
+      keys.add(key);
+    }
+    void addKey(String key) {
+      //keys.add(key);
+      //would need a list of (stmt,rs) pairs - 1 for each key
+      throw new NotImplementedException();
+    }
+    
+    @Override
+    public void releaseLocks() {
+      rollbackDBConn(dbConn);
+      close(rs, stmt, dbConn);
+      if(derbySemaphore != null) {
+        derbySemaphore.release();
+      }
+      for(String key : keys) {
+        LOG.info(quoteString(key) + " unlocked by " + quoteString(TxnHandler.hostname));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 6fc6ed9..3aac11b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -74,6 +74,7 @@ import java.util.Set;
 @InterfaceStability.Evolving
 public interface TxnStore {
 
+  public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory}
   // Compactor states (Should really be enum)
   static final public String INITIATED_RESPONSE = "initiated";
   static final public String WORKING_RESPONSE = "working";
@@ -355,10 +356,40 @@ public interface TxnStore {
    */
   public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException;
 
-
   @VisibleForTesting
   public int numLocksInLockTable() throws SQLException, MetaException;
 
   @VisibleForTesting
   long setTimeout(long milliseconds);
+
+  public MutexAPI getMutexAPI();
+
+  /**
+   * This is primarily designed to provide coarse grained mutex support to operations running
+   * inside the Metastore (of which there could be several instances).  The initial goal
is to 
+   * ensure that various sub-processes of the Compactor don't step on each other.
+   * 
+   * In RDMBS world each {@code LockHandle} uses a java.sql.Connection so use it sparingly.
+   */
+  public static interface MutexAPI {
+    /**
+     * The {@code key} is name of the lock. Will acquire and exclusive lock or block.  It
retuns
+     * a handle which must be used to release the lock.  Each invocation returns a new handle.
+     */
+    public LockHandle acquireLock(String key) throws MetaException;
+
+    /**
+     * Same as {@link #acquireLock(String)} but takes an already existing handle as input.
 This 
+     * will associate the lock on {@code key} with the same handle.  All locks associated
with
+     * the same handle will be released together.
+     * @param handle not NULL
+     */
+    public void acquireLock(String key, LockHandle handle) throws MetaException;
+    public static interface LockHandle {
+      /**
+       * Releases all locks associcated with this handle.
+       */
+      public void releaseLocks();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index f60e34b..4c14eef 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -75,8 +75,8 @@ public class TxnUtils {
     int i = 0;
     for (TxnInfo txn : txns.getOpen_txns()) {
       if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
-      exceptions[i++] = txn.getId();
-    }
+      exceptions[i++] = txn.getId();//todo: only add Aborted
+    }//remove all exceptions < minOpenTxn
     highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
     return new ValidCompactorTxnList(exceptions, -1, highWater);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
index 648fd49..30bdfa7 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.common.ValidReadTxnList;
 import java.util.Arrays;
 
 /**
- * And implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the
compactor.
+ * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the
compactor.
  * For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid
if it
  * is committed or aborted.  Additionally it will return none if there are any open transactions
  * below the max transaction given, since we don't want to compact above open transactions.
 For

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index b8cab71..6033c15 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.junit.*;
+import org.apache.hadoop.util.StringUtils;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -33,6 +34,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertFalse;
@@ -1214,6 +1216,97 @@ public class TestTxnHandler {
     }
   }
 
+  /**
+   * This cannnot be run against Derby (thus in UT) but it can run againt MySQL.
+   * 1. add to metastore/pom.xml
+   *     <dependency>
+   *      <groupId>mysql</groupId>
+   *      <artifactId>mysql-connector-java</artifactId>
+   *      <version>5.1.30</version>
+   *     </dependency>
+   * 2. Hack in the c'tor of this class
+   *     conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:mysql://localhost/metastore");
+   *      conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "hive");
+   *      conf.setVar(HiveConf.ConfVars.METASTOREPWD, "hive");
+   *      conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver");
+   * 3. Remove TxnDbUtil.prepDb(); in TxnHandler.checkQFileTestHack()
+   *      
+   */
+  @Ignore("multiple threads wedge Derby")
+  @Test
+  public void testMutexAPI() throws Exception {
+    final TxnStore.MutexAPI api =  txnHandler.getMutexAPI();
+    final AtomicInteger stepTracker = new AtomicInteger(0);
+    /**
+     * counter = 0;
+     * Thread1 counter=1, lock, wait 3s, check counter(should be 2), counter=3, unlock 
+     * Thread2 counter=2, lock (and block), inc counter, should be 4
+     */
+    Thread t1 = new Thread("MutexTest1") {
+      public void run() {
+        try {
+          stepTracker.incrementAndGet();//now 1
+          TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
+          Thread.sleep(4000);
+          //stepTracker should now be 2 which indicates t2 has started
+          Assert.assertEquals("Thread2 should have started by now but not done work", 2,
stepTracker.get());
+          stepTracker.incrementAndGet();//now 3
+          handle.releaseLocks();
+        }
+        catch(Exception ex) {
+          throw new RuntimeException(ex.getMessage(), ex);
+        }
+      }
+    };
+    t1.setDaemon(true);
+    ErrorHandle ueh1 = new ErrorHandle();
+    t1.setUncaughtExceptionHandler(ueh1);
+    Thread t2 = new Thread("MutexTest2") {
+      public void run() {
+        try {
+          stepTracker.incrementAndGet();//now 2
+          //this should block until t1 unlocks
+          TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
+          stepTracker.incrementAndGet();//now 4
+          Assert.assertEquals(4, stepTracker.get());
+          handle.releaseLocks();
+          stepTracker.incrementAndGet();//now 5
+        }
+        catch(Exception ex) {
+          throw new RuntimeException(ex.getMessage(), ex);
+        }
+      }
+    };
+    t2.setDaemon(true);
+    ErrorHandle ueh2 = new ErrorHandle();
+    t2.setUncaughtExceptionHandler(ueh2);
+    t1.start();
+    try {
+      Thread.sleep(1000);
+    }
+    catch(InterruptedException ex) {
+      LOG.info("Sleep was interrupted");
+    }
+    t2.start();
+    t1.join(6000);//so that test doesn't block
+    t2.join(6000);
+
+    if(ueh1.error != null) {
+      Assert.assertTrue("Unexpected error from t1: " + StringUtils.stringifyException(ueh1.error),
false);
+    }
+    if (ueh2.error != null) {
+      Assert.assertTrue("Unexpected error from t2: " + StringUtils.stringifyException(ueh2.error),
false);
+    }
+    Assert.assertEquals("5 means both threads have completed", 5, stepTracker.get());
+  }
+  private final static class ErrorHandle implements Thread.UncaughtExceptionHandler {
+    Throwable error = null;
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+      LOG.error("Uncaught exception from " + t.getName() + ": " + e.getMessage());
+      error = e;
+    }
+  }
   private void updateTxns(Connection conn) throws SQLException {
     Statement stmt = conn.createStatement();
     stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1");

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
index 59c8fe4..5d9e7be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
@@ -61,7 +61,9 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase
{
     
     @Override
     public void run() {
+      TxnStore.MutexAPI.LockHandle handle = null;
       try {
+        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.CompactionHistory.name());
         long startTime = System.currentTimeMillis();
         txnHandler.purgeCompactionHistory();
         int count = isAliveCounter.incrementAndGet(); 
@@ -70,6 +72,11 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase
{
       catch(Throwable t) {
         LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(),
t);
       }
+      finally {
+        if(handle != null) {
+          handle.releaseLocks();
+        }
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
index de74a7b..f39df17 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
@@ -61,7 +61,9 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
     }
     @Override
     public void run() {
+      TxnStore.MutexAPI.LockHandle handle = null;
       try {
+        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name());
         long startTime = System.currentTimeMillis();
         txnHandler.performTimeOuts();
         int count = isAliveCounter.incrementAndGet();
@@ -70,6 +72,11 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
       catch(Throwable t) {
         LOG.fatal("Serious error in " + Thread.currentThread().getName() + ": " + t.getMessage(),
t);
       }
+      finally {
+        if(handle != null) {
+          handle.releaseLocks();
+        }
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 33580fd..1e6e8a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.txn.compactor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -72,11 +73,13 @@ public class Cleaner extends CompactorThread {
       // and if so remembers that and then sets it to true at the end.  We have to check
here
       // first to make sure we go through a complete iteration of the loop before resetting
it.
       boolean setLooped = !looped.get();
-      long startedAt = System.currentTimeMillis();
+      TxnStore.MutexAPI.LockHandle handle = null;
+      long startedAt = -1;
       // Make sure nothing escapes this run method and kills the metastore at large,
       // so wrap it in a big catch Throwable statement.
       try {
-
+        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+        startedAt = System.currentTimeMillis();
         // First look for all the compactions that are waiting to be cleaned.  If we have
not
         // seen an entry before, look for all the locks held on that table or partition and
         // record them.  We will then only clean the partition once all of those locks have
been
@@ -86,6 +89,31 @@ public class Cleaner extends CompactorThread {
         // done the compaction will read the more up to date version of the data (either
in a
         // newer delta or in a newer base).
         List<CompactionInfo> toClean = txnHandler.findReadyToClean();
+        {
+          /**
+           * Since there may be more than 1 instance of Cleaner running we may have state
info
+           * for items which were cleaned by instances.  Here we remove them.
+           *
+           * In the long run if we add end_time to compaction_queue, then we can check that
+           * hive_locks.acquired_at > compaction_queue.end_time + safety_buffer in which
case
+           * we know the lock owner is reading files created by this compaction or later.
+           * The advantage is that we don't have to store the locks.
+           */
+          Set<Long> currentToCleanSet = new HashSet<>();
+          for (CompactionInfo ci : toClean) {
+            currentToCleanSet.add(ci.id);
+          }
+          Set<Long> cleanPerformedByOthers = new HashSet<>();
+          for (long id : compactId2CompactInfoMap.keySet()) {
+            if (!currentToCleanSet.contains(id)) {
+              cleanPerformedByOthers.add(id);
+            }
+          }
+          for (long id : cleanPerformedByOthers) {
+            compactId2CompactInfoMap.remove(id);
+            compactId2LockMap.remove(id);
+          }
+        }
         if (toClean.size() > 0 || compactId2LockMap.size() > 0) {
           ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest());
 
@@ -119,6 +147,7 @@ public class Cleaner extends CompactorThread {
                 // Remember to remove this when we're out of the loop,
                 // we can't do it in the loop or we'll get a concurrent modification exception.
                 compactionsCleaned.add(queueEntry.getKey());
+                //Future thought: this may be expensive so consider having a thread pool
run in parallel
                 clean(compactId2CompactInfoMap.get(queueEntry.getKey()));
               } else {
                 // Remove the locks we didn't see so we don't look for them again next time
@@ -140,6 +169,11 @@ public class Cleaner extends CompactorThread {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
             StringUtils.stringifyException(t));
       }
+      finally {
+        if (handle != null) {
+          handle.releaseLocks();
+        }
+      }
       if (setLooped) {
         looped.set(true);
       }
@@ -206,10 +240,24 @@ public class Cleaner extends CompactorThread {
       StorageDescriptor sd = resolveStorageDescriptor(t, p);
       final String location = sd.getLocation();
 
-      // Create a bogus validTxnList with a high water mark set to MAX_LONG and no open
-      // transactions.  This assures that all deltas are treated as valid and all we return
are
-      // obsolete files.
-      final ValidTxnList txnList = new ValidReadTxnList();
+      /**
+       * Each Compaction only compacts as far as the highest txn id such that all txns below
it
+       * are resolved (i.e. not opened).  This is what "highestTxnId" tracks.  This is only
tracked
+       * since Hive 1.3.0/2.0 - thus may be 0.  See ValidCompactorTxnList and uses for more
info.
+       * 
+       * We only want to clean up to the highestTxnId - otherwise we risk deleteing deltas
from
+       * under an active reader.
+       * 
+       * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so
now there is a 
+       * clean request for D2.  
+       * Cleaner checks existing locks and finds none.
+       * Between that check and removeFiles() a query starts (it will be reading D3) and
another compaction
+       * completes which creates D4.
+       * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to
be obsolete
+       * unless ValidTxnList is "capped" at highestTxnId.
+       */
+      final ValidTxnList txnList = ci.highestTxnId > 0 ? 
+        new ValidReadTxnList(new long[0], ci.highestTxnId) : new ValidReadTxnList();
 
       if (runJobAsSelf(ci.runAs)) {
         removeFiles(location, txnList);
@@ -249,7 +297,7 @@ public class Cleaner extends CompactorThread {
     FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
 
     for (Path dead : filesToDelete) {
-      LOG.debug("Doing to delete path " + dead.toString());
+      LOG.debug("Going to delete path " + dead.toString());
       fs.delete(dead, true);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 1898a4d..0e4ba06 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -74,11 +74,15 @@ public class Initiator extends CompactorThread {
       // much easier.  The stop value is only for testing anyway and not used when called
from
       // HiveMetaStore.
       do {
-        long startedAt = System.currentTimeMillis();
+        long startedAt = -1;
+        TxnStore.MutexAPI.LockHandle handle = null;
 
         // Wrap the inner parts of the loop in a catch throwable so that any errors in the
loop
         // don't doom the entire thread.
-        try {//todo: add method to only get current i.e. skip history - more efficient
+        try {
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name());
+          startedAt = System.currentTimeMillis();
+          //todo: add method to only get current i.e. skip history - more efficient
           ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
           ValidTxnList txns =
               TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
@@ -114,6 +118,8 @@ public class Initiator extends CompactorThread {
               // Check if we already have initiated or are working on a compaction for this
partition
               // or table.  If so, skip it.  If we are just waiting on cleaning we can still
check,
               // as it may be time to compact again even though we haven't cleaned.
+              //todo: this is not robust.  You can easily run Alter Table to start a compaction
between
+              //the time currentCompactions is generated and now
               if (lookForCurrentCompactions(currentCompactions, ci)) {
                 LOG.debug("Found currently initiated or working compaction for " +
                     ci.getFullPartitionName() + " so we will not initiate another compaction");
@@ -134,7 +140,9 @@ public class Initiator extends CompactorThread {
               }
               StorageDescriptor sd = resolveStorageDescriptor(t, p);
               String runAs = findUserToRunAs(sd.getLocation(), t);
-
+              /*Future thought: checkForCompaction will check a lot of file metadata and
may be expensive.
+              * Long term we should consider having a thread pool here and running checkForCompactionS
+              * in parallel*/
               CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, runAs);
               if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
             } catch (Throwable t) {
@@ -154,6 +162,11 @@ public class Initiator extends CompactorThread {
           LOG.error("Initiator loop caught unexpected exception this time through the loop:
" +
               StringUtils.stringifyException(t));
         }
+        finally {
+          if(handle != null) {
+            handle.releaseLocks();
+          }
+        }
 
         long elapsedTime = System.currentTimeMillis() - startedAt;
         if (elapsedTime >= checkInterval || stop.get())  continue;

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 6f8dc35..8394ec6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -229,7 +229,6 @@ public class TestTxnCommands2 {
     }
     Assert.assertFalse("PPD '" + ppd + "' wasn't pushed", true);
   }
-  @Ignore("alter table")
   @Test
   public void testAlterTable() throws Exception {
     int[][] tableData = {{1,2}};
@@ -604,7 +603,13 @@ public class TestTxnCommands2 {
   private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf
conf) throws Exception {
     int lastCount = houseKeeperService.getIsAliveCounter();
     houseKeeperService.start(conf);
+    int maxIter = 10;
+    int iterCount = 0;
     while(houseKeeperService.getIsAliveCounter() <= lastCount) {
+      if(iterCount++ >= maxIter) {
+        //prevent test hangs
+        throw new IllegalStateException("HouseKeeper didn't run after " + iterCount + " waits");
+      }
       try {
         Thread.sleep(100);//make sure it has run at least once
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 99705b4..b355dbe 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -194,7 +194,13 @@ public class TestDbTxnManager {
   private void runReaper() throws Exception {
     int lastCount = houseKeeperService.getIsAliveCounter();
     houseKeeperService.start(conf);
+    int maxIter = 10;
+    int iterCount = 0;
     while(houseKeeperService.getIsAliveCounter() <= lastCount) {
+      if(iterCount++ >= maxIter) {
+        //prevent test hangs
+        throw new IllegalStateException("Reaper didn't run after " + iterCount + " waits");
+      }
       try {
         Thread.sleep(100);//make sure it has run at least once
       }


Mime
View raw message