hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ekoif...@apache.org
Subject hive git commit: HIVE-12353 When Compactor fails it calls CompactionTxnHandler.markedCleaned(). it should not. (Eugene Koifman, reviewed by Alan Gates)
Date Fri, 22 Jan 2016 02:39:10 GMT
Repository: hive
Updated Branches:
  refs/heads/branch-2.0 80f80b0e3 -> e8388ae67


HIVE-12353 When Compactor fails it calls CompactionTxnHandler.markedCleaned().  it should not. (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e8388ae6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e8388ae6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e8388ae6

Branch: refs/heads/branch-2.0
Commit: e8388ae67dbe097284c8eb7d7e0be00316b90d5d
Parents: 80f80b0
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Thu Jan 21 18:39:04 2016 -0800
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Thu Jan 21 18:39:04 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  24 +-
 .../hive/ql/txn/compactor/TestCompactor.java    |  57 -----
 .../hadoop/hive/metastore/HiveMetaStore.java    |   5 +-
 .../hive/metastore/HouseKeeperService.java      |   6 +
 .../hive/metastore/txn/CompactionInfo.java      |  56 +++-
 .../metastore/txn/CompactionTxnHandler.java     | 256 +++++++++++++++++--
 .../hadoop/hive/metastore/txn/TxnDbUtil.java    |  21 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  93 ++++---
 .../metastore/txn/TestCompactionTxnHandler.java |   3 +-
 .../hive/ql/txn/AcidHouseKeeperService.java     |  65 ++---
 .../hadoop/hive/ql/txn/compactor/Cleaner.java   |  10 +-
 .../hive/ql/txn/compactor/CompactorMR.java      |   4 +
 .../hadoop/hive/ql/txn/compactor/Initiator.java |  13 +-
 .../hadoop/hive/ql/txn/compactor/Worker.java    |   8 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 217 ++++++++++++++++
 .../hive/ql/txn/compactor/TestCleaner.java      |  28 +-
 .../hive/ql/txn/compactor/TestWorker.java       |   7 +-
 17 files changed, 678 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 33b3f3f..26ba4f0 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -987,6 +987,7 @@ public class HiveConf extends Configuration {
     HIVETESTMODEDUMMYSTATPUB("hive.test.dummystats.publisher", "", "internal variable for test", false),
     HIVETESTCURRENTTIMESTAMP("hive.test.currenttimestamp", null, "current timestamp for test", false),
     HIVETESTMODEROLLBACKTXN("hive.test.rollbacktxn", false, "For testing only.  Will mark every ACID transaction aborted", false),
+    HIVETESTMODEFAILCOMPACTION("hive.test.fail.compaction", false, "For testing only.  Will cause CompactorMR to fail.", false),
 
     HIVEMERGEMAPFILES("hive.merge.mapfiles", true,
         "Merge small files at the end of a map-only job"),
@@ -1562,11 +1563,32 @@ public class HiveConf extends Configuration {
     HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD("hive.compactor.abortedtxn.threshold", 1000,
         "Number of aborted transactions involving a given table or partition that will trigger\n" +
         "a major compaction."),
-
+    
+    COMPACTOR_INITIATOR_FAILED_THRESHOLD("hive.compactor.initiator.failed.compacts.threshold", 2,
+      new RangeValidator(1, 20), "Number of consecutive compaction failures (per table/partition) " +
+      "after which automatic compactions will not be scheduled any more.  Note that this must be less " +
+      "than hive.compactor.history.retention.failed."),
+    
     HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms",
         new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"),
     COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" +
       "Compaction jobs will be submitted.  Set to empty string to let Hadoop choose the queue."),
+    
+    COMPACTOR_HISTORY_RETENTION_SUCCEEDED("hive.compactor.history.retention.succeeded", 3,
+      new RangeValidator(0, 100), "Determines how many successful compaction records will be " +
+      "retained in compaction history for a given table/partition."),
+    
+    COMPACTOR_HISTORY_RETENTION_FAILED("hive.compactor.history.retention.failed", 3,
+      new RangeValidator(0, 100), "Determines how many failed compaction records will be " +
+      "retained in compaction history for a given table/partition."),
+    
+    COMPACTOR_HISTORY_RETENTION_ATTEMPTED("hive.compactor.history.retention.attempted", 2,
+      new RangeValidator(0, 100), "Determines how many attempted compaction records will be " +
+      "retained in compaction history for a given table/partition."),
+    
+    COMPACTOR_HISTORY_REAPER_INTERVAL("hive.compactor.history.reaper.interval", "2m",
+      new TimeValidator(TimeUnit.MILLISECONDS), "Determines how often compaction history reaper runs"),
+    
     HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s",
       new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"),
     HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s",

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index da367ca..226a1fa 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -790,63 +790,6 @@ public class TestCompactor {
     }
   }
 
-  /**
-   * HIVE-12352 has details
-   * @throws Exception
-   */
-  @Test
-  public void writeBetweenWorkerAndCleaner() throws Exception {
-    String tblName = "HIVE12352";
-    executeStatementOnDriver("drop table if exists " + tblName, driver);
-    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
-      " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
-      " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
-
-    //create some data
-    executeStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')", driver);
-    executeStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3", driver);
-
-    //run Worker to execute compaction
-    CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
-    txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
-    Worker t = new Worker();
-    t.setThreadId((int) t.getId());
-    t.setHiveConf(conf);
-    AtomicBoolean stop = new AtomicBoolean(true);
-    AtomicBoolean looped = new AtomicBoolean();
-    t.init(stop, looped);
-    t.run();
-
-    //delete something, but make sure txn is rolled back
-    conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
-    executeStatementOnDriver("delete from " + tblName + " where a = 1", driver);
-    conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
-
-    List<String> expected = new ArrayList<>();
-    expected.add("1\tfoo");
-    expected.add("2\tbar");
-    expected.add("3\tblah");
-    Assert.assertEquals("", expected,
-      execSelectAndDumpData("select a,b from " + tblName + " order by a", driver, "writeBetweenWorkerAndCleaner()"));
-
-    //run Cleaner
-    Cleaner c = new Cleaner();
-    c.setThreadId((int)c.getId());
-    c.setHiveConf(conf);
-    c.init(stop, new AtomicBoolean());
-    c.run();
-
-    //this seems odd, but we wan to make sure that to run CompactionTxnHandler.cleanEmptyAbortedTxns()
-    Initiator i = new Initiator();
-    i.setThreadId((int)i.getId());
-    i.setHiveConf(conf);
-    i.init(stop, new AtomicBoolean());
-    i.run();
-
-    //check that aborted operation didn't become committed
-    Assert.assertEquals("", expected,
-      execSelectAndDumpData("select a,b from " + tblName + " order by a", driver, "writeBetweenWorkerAndCleaner()"));
-  }
   @Test
   public void majorCompactAfterAbort() throws Exception {
     String dbName = "default";

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 00602e1..8f99228 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -6220,7 +6220,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     if(!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON)) {
       return;
     }
-    Class c = Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService");
+    startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService"));
+    startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService"));
+  }
+  private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception {
     //todo: when metastore adds orderly-shutdown logic, houseKeeper.stop()
     //should be called form it
     HouseKeeperService houseKeeper = (HouseKeeperService)c.newInstance();

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
index eb4ea93..539ace0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HouseKeeperService.java
@@ -36,4 +36,10 @@ public interface HouseKeeperService {
    * Returns short description of services this module provides.
    */
   public String getServiceDescription();
+
+  /**
+   * This is incremented each time the service is performed.  Can be useful to
+   * check if serivce is still alive.
+   */
+  public int getIsAliveCounter();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index d3cb7d5..73255d2 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -19,6 +19,10 @@ package org.apache.hadoop.hive.metastore.txn;
 
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
 /**
  * Information on a possible or running compaction.
  */
@@ -27,13 +31,18 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
   public String dbname;
   public String tableName;
   public String partName;
+  char state;
   public CompactionType type;
+  String workerId;
+  long start;
   public String runAs;
   public boolean tooManyAborts = false;
   /**
-   * {@code null} means it wasn't set (e.g. in case of upgrades) 
+   * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL) 
    */
-  public Long highestTxnId;
+  public long highestTxnId;
+  byte[] metaInfo;
+  String hadoopJobId;
 
   private String fullPartitionName = null;
   private String fullTableName = null;
@@ -44,6 +53,11 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
     this.partName = partName;
     this.type = type;
   }
+  CompactionInfo(long id, String dbname, String tableName, String partName, char state) {
+    this(dbname, tableName, partName, null);
+    this.id = id;
+    this.state = state;
+  }
   CompactionInfo() {}
   
   public String getFullPartitionName() {
@@ -82,9 +96,47 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
       "dbname:" + dbname + "," +
       "tableName:" + tableName + "," +
       "partName:" + partName + "," +
+      "state:" + state + "," +
       "type:" + type + "," +
       "runAs:" + runAs + "," +
       "tooManyAborts:" + tooManyAborts + "," +
       "highestTxnId:" + highestTxnId;
   }
+
+  /**
+   * loads object from a row in Select * from COMPACTION_QUEUE
+   * @param rs ResultSet after call to rs.next()
+   * @throws SQLException
+   */
+  static CompactionInfo loadFullFromCompactionQueue(ResultSet rs) throws SQLException {
+    CompactionInfo fullCi = new CompactionInfo();
+    fullCi.id = rs.getLong(1);
+    fullCi.dbname = rs.getString(2);
+    fullCi.tableName = rs.getString(3);
+    fullCi.partName = rs.getString(4);
+    fullCi.state = rs.getString(5).charAt(0);//cq_state
+    fullCi.type = TxnHandler.dbCompactionType2ThriftType(rs.getString(6).charAt(0));
+    fullCi.workerId = rs.getString(7);
+    fullCi.start = rs.getLong(8);
+    fullCi.runAs = rs.getString(9);
+    fullCi.highestTxnId = rs.getLong(10);
+    fullCi.metaInfo = rs.getBytes(11);
+    fullCi.hadoopJobId = rs.getString(12);
+    return fullCi;
+  }
+  static void insertIntoCompletedCompactions(PreparedStatement pStmt, CompactionInfo ci, long endTime) throws SQLException {
+    pStmt.setLong(1, ci.id);
+    pStmt.setString(2, ci.dbname);
+    pStmt.setString(3, ci.tableName);
+    pStmt.setString(4, ci.partName);
+    pStmt.setString(5, Character.toString(ci.state));
+    pStmt.setString(6, Character.toString(TxnHandler.thriftCompactionType2DbType(ci.type)));
+    pStmt.setString(7, ci.workerId);
+    pStmt.setLong(8, ci.start);
+    pStmt.setLong(9, endTime);
+    pStmt.setString(10, ci.runAs);
+    pStmt.setLong(11, ci.highestTxnId);
+    pStmt.setBytes(12, ci.metaInfo);
+    pStmt.setString(13, ci.hadoopJobId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 9130322..18b288d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -174,16 +174,7 @@ public class CompactionTxnHandler extends TxnHandler {
           info.dbname = rs.getString(2);
           info.tableName = rs.getString(3);
           info.partName = rs.getString(4);
-          switch (rs.getString(5).charAt(0)) {
-            case MAJOR_TYPE:
-              info.type = CompactionType.MAJOR;
-              break;
-            case MINOR_TYPE:
-              info.type = CompactionType.MINOR;
-              break;
-            default:
-              throw new MetaException("Unexpected compaction type " + rs.getString(5));
-          }
+          info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
           // Now, update this record as being worked on by this worker.
           long now = getDbTime(dbConn);
           s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
@@ -291,8 +282,7 @@ public class CompactionTxnHandler extends TxnHandler {
             default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
           }
           info.runAs = rs.getString(6);
-          long highestTxnId = rs.getLong(7);
-          info.highestTxnId = rs.wasNull() ? null : highestTxnId;
+          info.highestTxnId = rs.getLong(7);
           rc.add(info);
         }
         LOG.debug("Going to rollback");
@@ -323,13 +313,19 @@ public class CompactionTxnHandler extends TxnHandler {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      PreparedStatement pStmt = null;
       ResultSet rs = null;
       try {
-        //do we need serializable?  Once we have the HWM as above, no.  Before that
-        //it's debatable, but problem described above applies either way
-        //Thus can drop to RC
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
+        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id);
+        if(rs.next()) {
+          info = CompactionInfo.loadFullFromCompactionQueue(rs);
+        }
+        else {
+          throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE");
+        }
+        close(rs);
         String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
         LOG.debug("Going to execute update <" + s + ">");
         int updCount = stmt.executeUpdate(s);
@@ -338,6 +334,10 @@ public class CompactionTxnHandler extends TxnHandler {
           LOG.debug("Going to rollback");
           dbConn.rollback();
         }
+        pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)");
+        info.state = SUCCEEDED_STATE;
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn));
+        updCount = pStmt.executeUpdate();
 
         // Remove entries from completed_txn_components as well, so we don't start looking there
         // again but only up to the highest txn ID include in this compaction job.
@@ -347,7 +347,7 @@ public class CompactionTxnHandler extends TxnHandler {
         if (info.partName != null) {
           s += " and ctc_partition = '" + info.partName + "'";
         }
-        if(info.highestTxnId != null) {
+        if(info.highestTxnId != 0) {
           s += " and ctc_txnid <= " + info.highestTxnId;
         }
         LOG.debug("Going to execute update <" + s + ">");
@@ -358,7 +358,7 @@ public class CompactionTxnHandler extends TxnHandler {
 
         s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
           TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
-          info.tableName + "'" + (info.highestTxnId == null ? "" : " and txn_id <= " + info.highestTxnId);
+          info.tableName + "'" + (info.highestTxnId == 0 ? "" : " and txn_id <= " + info.highestTxnId);
         if (info.partName != null) s += " and tc_partition = '" + info.partName + "'";
         LOG.debug("Going to execute update <" + s + ">");
         rs = stmt.executeQuery(s);
@@ -406,6 +406,7 @@ public class CompactionTxnHandler extends TxnHandler {
         throw new MetaException("Unable to connect to transaction database " +
           StringUtils.stringifyException(e));
       } finally {
+        closeStmt(pStmt);
         close(rs, stmt, dbConn);
       }
     } catch (RetryException e) {
@@ -668,6 +669,225 @@ public class CompactionTxnHandler extends TxnHandler {
       setCompactionHighestTxnId(ci, highestTxnId);
     }
   }
+  private static class RetentionCounters {
+    int attemptedRetention = 0;
+    int failedRetention = 0;
+    int succeededRetention = 0;
+    RetentionCounters(int attemptedRetention, int failedRetention, int succeededRetention) {
+      this.attemptedRetention = attemptedRetention;
+      this.failedRetention = failedRetention;
+      this.succeededRetention = succeededRetention;
+    }
+  }
+  private void checkForDeletion(List<Long> deleteSet, CompactionInfo ci, RetentionCounters rc) {
+    switch (ci.state) {
+      case ATTEMPTED_STATE:
+        if(--rc.attemptedRetention < 0) {
+          deleteSet.add(ci.id);
+        }
+        break;
+      case FAILED_STATE:
+        if(--rc.failedRetention < 0) {
+          deleteSet.add(ci.id);
+        }
+        break;
+      case SUCCEEDED_STATE:
+        if(--rc.succeededRetention < 0) {
+          deleteSet.add(ci.id);
+        }
+        break;
+      default:
+        //do nothing to hanlde future RU/D where we may want to add new state types
+    }
+  }
+
+  /**
+   * For any given compactable entity (partition, table if not partitioned) the history of compactions
+   * may look like "sssfffaaasffss", for example.  The idea is to retain the tail (most recent) of the
+   * history such that a configurable number of each type of state is present.  Any other entries
+   * can be purged.  This scheme has advantage of always retaining the last failure/success even if
+   * it's not recent.
+   * @throws MetaException
+   */
+  public void purgeCompactionHistory() throws MetaException {
+    Connection dbConn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    List<Long> deleteSet = new ArrayList<>();
+    RetentionCounters rc = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        /*cc_id is monotonically increasing so for any entity sorts in order of compaction history,
+        thus this query groups by entity and withing group sorts most recent first*/
+        rs = stmt.executeQuery("select cc_id, cc_database, cc_table, cc_partition, cc_state from " +
+          "COMPLETED_COMPACTIONS order by cc_database, cc_table, cc_partition, cc_id desc");
+        String lastCompactedEntity = null;
+        /*In each group, walk from most recent and count occurences of each state type.  Once you
+        * have counted enough (for each state) to satisfy retention policy, delete all other
+        * instances of this status.*/
+        while(rs.next()) {
+          CompactionInfo ci = new CompactionInfo(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getString(4), rs.getString(5).charAt(0));
+          if(!ci.getFullPartitionName().equals(lastCompactedEntity)) {
+            lastCompactedEntity = ci.getFullPartitionName();
+            rc = new RetentionCounters(conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+              getFailedCompactionRetention(),
+              conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED));
+          }
+          checkForDeletion(deleteSet, ci, rc);
+        }
+        close(rs);
+        
+        String baseDeleteSql = "delete from COMPLETED_COMPACTIONS where cc_id IN(";
+        StringBuilder queryStr = new StringBuilder(baseDeleteSql);
+        for(int i = 0; i < deleteSet.size(); i++) {
+          if(i > 0 && i % TIMED_OUT_TXN_ABORT_BATCH_SIZE == 0) {
+            queryStr.setCharAt(queryStr.length() - 1, ')');
+            stmt.executeUpdate(queryStr.toString());
+            dbConn.commit();
+            queryStr = new StringBuilder(baseDeleteSql);
+          }
+          queryStr.append(deleteSet.get(i)).append(',');
+        }
+        if(queryStr.length() > baseDeleteSql.length()) {
+          queryStr.setCharAt(queryStr.length() - 1, ')');
+          int updCnt = stmt.executeUpdate(queryStr.toString());
+          dbConn.commit();
+        }
+        dbConn.commit();
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "purgeCompactionHistory()");
+        throw new MetaException("Unable to connect to transaction database " +
+          StringUtils.stringifyException(e));
+      } finally {
+        close(rs, stmt, dbConn);
+      }
+    } catch (RetryException ex) {
+      purgeCompactionHistory();
+    }
+  }
+  /**
+   * this ensures that the number of failed compaction entries retained is > than number of failed
+   * compaction threshold which prevents new compactions from being scheduled.
+   */
+  public int getFailedCompactionRetention() {
+    int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+    int failedRetention = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED);
+    if(failedRetention < failedThreshold) {
+      LOG.warn("Invalid configuration " + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname +
+        "=" + failedRetention + " < " + HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED + "=" +
+        failedRetention + ".  Will use " + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname +
+        "=" + failedRetention);
+      failedRetention = failedThreshold;
+    }
+    return failedRetention;
+  }
+  /**
+   * Returns {@code true} if there already exists sufficient number of consecutive failures for
+   * this table/partition so that no new automatic compactions will be scheduled.
+   * User initiated compactions don't do this check.
+   *
+   * Do we allow compacting whole table (when it's partitioned)?  No, though perhaps we should.
+   * That would be a meta operations, i.e. first find all partitions for this table (which have 
+   * txn info) and schedule each compaction separately.  This avoids complications in this logic.
+   */
+  public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException {
+    Connection dbConn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        rs = stmt.executeQuery("select CC_STATE from COMPLETED_COMPACTIONS where " +
+          "CC_DATABASE = " + quoteString(ci.dbname) + " and " +
+          "CC_TABLE = " + quoteString(ci.tableName) +
+          (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") +
+          " order by CC_ID desc");
+        int numFailed = 0;
+        int numTotal = 0;
+        int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+        while(rs.next() && ++numTotal <= failedThreshold) {
+          if(rs.getString(1).charAt(0) == FAILED_STATE) {
+            numFailed++;
+          }
+          else {
+            numFailed--;
+          }
+        }
+        return numFailed == failedThreshold;
+      }
+      catch (SQLException e) {
+        LOG.error("Unable to delete from compaction queue " + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")");
+        LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
+        return false;//weren't able to check
+      } finally {
+        close(rs, stmt, dbConn);
+      }
+    } catch (RetryException e) {
+      return checkFailedCompactions(ci);
+    }
+  }
+  /**
+   * If there is an entry in compaction_queue with ci.id, remove it
+   * Make entry in completed_compactions with status 'f'.
+   *
+   * but what abount markCleaned() which is called when table is had been deleted...
+   */
+  public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw
+    //todo: this shoudl take "comment" as parameter to set in CC_META_INFO to provide some context for the failure
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      PreparedStatement pStmt = null;
+      ResultSet rs = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id);
+        if(rs.next()) {
+          ci = CompactionInfo.loadFullFromCompactionQueue(rs);
+          String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id;
+          LOG.debug("Going to execute update <" + s + ">");
+          int updCnt = stmt.executeUpdate(s);
+        }
+        else {
+          throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE");
+        }
+        close(rs, stmt, null);
+
+        pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)");
+        ci.state = FAILED_STATE;
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        LOG.debug("Going to commit");
+        closeStmt(pStmt);
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.error("Unable to delete from compaction queue " + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        try {
+          checkRetryable(dbConn, e, "markFailed(" + ci + ")");
+        }
+        catch(MetaException ex) {
+          LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
+        }
+        LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
+      } finally {
+        close(rs, stmt, null);
+        close(null, pStmt, dbConn);
+      }
+    } catch (RetryException e) {
+      markFailed(ci);
+    }
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 2015526..2a7545c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -118,10 +118,27 @@ public final class TxnDbUtil {
           " CQ_WORKER_ID varchar(128)," +
           " CQ_START bigint," +
           " CQ_RUN_AS varchar(128)," +
-          " CQ_HIGHEST_TXN_ID bigint)");
+          " CQ_HIGHEST_TXN_ID bigint," +
+          " CQ_META_INFO varchar(2048) for bit data," +
+          " CQ_HADOOP_JOB_ID varchar(32))");
 
       stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
       stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
+      
+      stmt.execute("CREATE TABLE COMPLETED_COMPACTIONS (" +
+        " CC_ID bigint PRIMARY KEY," +
+        " CC_DATABASE varchar(128) NOT NULL," +
+        " CC_TABLE varchar(128) NOT NULL," +
+        " CC_PARTITION varchar(767)," +
+        " CC_STATE char(1) NOT NULL," +
+        " CC_TYPE char(1) NOT NULL," +
+        " CC_WORKER_ID varchar(128)," +
+        " CC_START bigint," +
+        " CC_END bigint," +
+        " CC_RUN_AS varchar(128)," +
+        " CC_HIGHEST_TXN_ID bigint," +
+        " CC_META_INFO varchar(2048) for bit data," +
+        " CC_HADOOP_JOB_ID varchar(32))");
 
       conn.commit();
     } catch (SQLException e) {
@@ -161,7 +178,7 @@ public final class TxnDbUtil {
       dropTable(stmt, "NEXT_LOCK_ID");
       dropTable(stmt, "COMPACTION_QUEUE");
       dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID");
-
+      dropTable(stmt, "COMPLETED_COMPACTIONS");
       conn.commit();
     } finally {
       closeResources(conn, stmt, null);

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 12ee52d..a65551a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -64,14 +64,20 @@ import java.util.concurrent.TimeUnit;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class TxnHandler {
-  // Compactor states
+  // Compactor states (Should really be enum)
   static final public String INITIATED_RESPONSE = "initiated";
   static final public String WORKING_RESPONSE = "working";
   static final public String CLEANING_RESPONSE = "ready for cleaning";
+  static final public String FAILED_RESPONSE = "failed";
+  static final public String SUCCEEDED_RESPONSE = "succeeded";
+  static final public String ATTEMPTED_RESPONSE = "attempted";
 
   static final protected char INITIATED_STATE = 'i';
   static final protected char WORKING_STATE = 'w';
   static final protected char READY_FOR_CLEANING = 'r';
+  static final char FAILED_STATE = 'f';
+  static final char SUCCEEDED_STATE = 's';
+  static final char ATTEMPTED_STATE = 'a';
 
   // Compactor types
   static final protected char MAJOR_TYPE = 'a';
@@ -759,7 +765,7 @@ public class TxnHandler {
     }
   }
 
-  public void compact(CompactionRequest rqst) throws MetaException {
+  public long compact(CompactionRequest rqst) throws MetaException {
     // Put a compaction request in the queue.
     try {
       Connection dbConn = null;
@@ -826,6 +832,7 @@ public class TxnHandler {
         stmt.executeUpdate(s);
         LOG.debug("Going to commit");
         dbConn.commit();
+        return id;
       } catch (SQLException e) {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
@@ -837,7 +844,7 @@ public class TxnHandler {
         closeDbConn(dbConn);
       }
     } catch (RetryException e) {
-      compact(rqst);
+      return compact(rqst);
     }
   }
 
@@ -850,7 +857,13 @@ public class TxnHandler {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
-          "cq_start, cq_run_as from COMPACTION_QUEUE";
+          "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " +
+          "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " +
+          "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS";
+        //what I want is order by cc_end desc, cc_start asc (but derby has a bug https://issues.apache.org/jira/browse/DERBY-6013)
+        //to sort so that currently running jobs are at the end of the list (bottom of screen)
+        //and currently running ones are in sorted by start time
+        //w/o order by likely currently running compactions will be first (LHS of Union)
         LOG.debug("Going to execute query <" + s + ">");
         ResultSet rs = stmt.executeQuery(s);
         while (rs.next()) {
@@ -862,16 +875,26 @@ public class TxnHandler {
             case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break;
             case WORKING_STATE: e.setState(WORKING_RESPONSE); break;
             case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break;
-            default: throw new MetaException("Unexpected compaction state " + rs.getString(4));
+            case FAILED_STATE: e.setState(FAILED_RESPONSE); break;
+            case SUCCEEDED_STATE: e.setState(SUCCEEDED_RESPONSE); break;
+            default:
+              //do nothing to handle RU/D if we add another status
           }
           switch (rs.getString(5).charAt(0)) {
             case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break;
             case MINOR_TYPE: e.setType(CompactionType.MINOR); break;
-            default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
+            default:
+              //do nothing to handle RU/D if we add another status
           }
           e.setWorkerid(rs.getString(6));
           e.setStart(rs.getLong(7));
-          e.setRunAs(rs.getString(8));
+          long endTime = rs.getLong(8);
+          if(endTime != -1) {
+            e.setEndTime(endTime);
+          }
+          e.setRunAs(rs.getString(9));
+          e.setHadoopJobId(rs.getString(10));
+          long id = rs.getLong(11);//for debugging
           response.addToCompacts(e);
         }
         LOG.debug("Going to rollback");
@@ -2331,41 +2354,29 @@ public class TxnHandler {
         throw new MetaException(msg);
     }
   }
-  /**
-   * the caller is expected to retry if this fails
-   *
-   * @return
-   * @throws SQLException
-   * @throws MetaException
-   */
-  private long generateNewExtLockId() throws SQLException, MetaException {
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      dbConn = getDbConn(getRequiredIsolationLevel());
-      stmt = dbConn.createStatement();
-
-      // Get the next lock id.
-      String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID");
-      LOG.debug("Going to execute query <" + s + ">");
-      rs = stmt.executeQuery(s);
-      if (!rs.next()) {
-        LOG.debug("Going to rollback");
-        dbConn.rollback();
-        throw new MetaException("Transaction tables not properly " +
-          "initialized, no record found in next_lock_id");
-      }
-      long extLockId = rs.getLong(1);
-      s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
-      LOG.debug("Going to execute update <" + s + ">");
-      stmt.executeUpdate(s);
-      LOG.debug("Going to commit.");
-      dbConn.commit();
-      return extLockId;
+  static String quoteString(String input) {
+    return "'" + input + "'";
+  }
+  static CompactionType dbCompactionType2ThriftType(char dbValue) {
+    switch (dbValue) {
+      case MAJOR_TYPE:
+        return CompactionType.MAJOR;
+      case MINOR_TYPE:
+        return CompactionType.MINOR;
+      default:
+        LOG.warn("Unexpected compaction type " + dbValue);
+        return null;
     }
-    finally {
-      close(rs, stmt, dbConn);
+  }
+  static Character thriftCompactionType2DbType(CompactionType ct) {
+    switch (ct) {
+      case MAJOR:
+        return MAJOR_TYPE;
+      case MINOR:
+        return MINOR_TYPE;
+      default:
+        LOG.warn("Unexpected compaction type " + ct);
+        return null;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 06e0932..ff2c2c1 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -219,7 +219,8 @@ public class TestCompactionTxnHandler {
     assertEquals(0, txnHandler.findReadyToClean().size());
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    assertEquals(0, rsp.getCompactsSize());
+    assertEquals(1, rsp.getCompactsSize());
+    assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
index dee7601..96e4d40 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
@@ -17,17 +17,12 @@
  */
 package org.apache.hadoop.hive.ql.txn;
 
+import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HouseKeeperService;
 import org.apache.hadoop.hive.metastore.txn.TxnHandler;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
-import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -35,58 +30,40 @@ import java.util.concurrent.atomic.AtomicInteger;
  * Performs background tasks for Transaction management in Hive.
  * Runs inside Hive Metastore Service.
  */
-public class AcidHouseKeeperService implements HouseKeeperService {
+public class AcidHouseKeeperService extends HouseKeeperServiceBase {
   private static final Logger LOG = LoggerFactory.getLogger(AcidHouseKeeperService.class);
-  private ScheduledExecutorService pool = null;
-  private final AtomicInteger isAliveCounter = new AtomicInteger(Integer.MIN_VALUE);
+
   @Override
-  public void start(HiveConf hiveConf) throws Exception {
-    HiveTxnManager mgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf);
-    if(!mgr.supportsAcid()) {
-      LOG.info(AcidHouseKeeperService.class.getName() + " not started since " +
-        mgr.getClass().getName()  + " does not support Acid.");
-      return;//there are no transactions in this case
-    }
-    pool = Executors.newScheduledThreadPool(1, new ThreadFactory() {
-      private final AtomicInteger threadCounter = new AtomicInteger();
-      @Override
-      public Thread newThread(Runnable r) {
-        return new Thread(r, "DeadTxnReaper-" + threadCounter.getAndIncrement());
-      }
-    });
-    TimeUnit tu = TimeUnit.MILLISECONDS;
-    pool.scheduleAtFixedRate(new TimedoutTxnReaper(hiveConf, this),
-      hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, tu),
-      hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, tu),
-      TimeUnit.MILLISECONDS);
-    LOG.info("Started " + this.getClass().getName() + " with delay/interval = " +
-      hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, tu) + "/" +
-      hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, tu) + " " + tu);
+  protected long getStartDelayMs() {
+    return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, TimeUnit.MILLISECONDS);
   }
   @Override
-  public void stop() {
-    if(pool != null && !pool.isShutdown()) {
-      pool.shutdown();
-    }
-    pool = null;
+  protected long getIntervalMs() {
+    return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_INTERVAL, TimeUnit.MILLISECONDS);
+  }
+  @Override
+  protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+    return new TimedoutTxnReaper(hiveConf, isAliveCounter);
   }
+
   @Override
   public String getServiceDescription() {
     return "Abort expired transactions";
   }
+
   private static final class TimedoutTxnReaper implements Runnable {
     private final TxnHandler txnHandler;
-    private final AcidHouseKeeperService owner;
-    private TimedoutTxnReaper(HiveConf hiveConf, AcidHouseKeeperService owner) {
+    private final AtomicInteger isAliveCounter;
+    private TimedoutTxnReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
       txnHandler = new TxnHandler(hiveConf);
-      this.owner = owner;
+      this.isAliveCounter = isAliveCounter;
     }
     @Override
     public void run() {
       try {
         long startTime = System.currentTimeMillis();
         txnHandler.performTimeOuts();
-        int count = owner.isAliveCounter.incrementAndGet();
+        int count = isAliveCounter.incrementAndGet();
         LOG.info("timeout reaper ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds.  isAliveCounter=" + count);
       }
       catch(Throwable t) {
@@ -94,12 +71,4 @@ public class AcidHouseKeeperService implements HouseKeeperService {
       }
     }
   }
-
-  /**
-   * This is used for testing only.  Each time the housekeeper runs, counter is incremented by 1.
-   * Starts with {@link java.lang.Integer#MIN_VALUE}
-   */
-  public int getIsAliveCounter() {
-    return isAliveCounter.get();
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index b847202..fbf5481 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -189,6 +189,7 @@ public class Cleaner extends CompactorThread {
       if (t == null) {
         // The table was dropped before we got around to cleaning it.
         LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped");
+        txnHandler.markCleaned(ci);
         return;
       }
       Partition p = null;
@@ -198,6 +199,7 @@ public class Cleaner extends CompactorThread {
           // The partition was dropped before we got around to cleaning it.
           LOG.info("Unable to find partition " + ci.getFullPartitionName() +
               ", assuming it was dropped");
+          txnHandler.markCleaned(ci);
           return;
         }
       }
@@ -223,13 +225,11 @@ public class Cleaner extends CompactorThread {
           }
         });
       }
-
+      txnHandler.markCleaned(ci);
     } catch (Exception e) {
-      LOG.error("Caught exception when cleaning, unable to complete cleaning " +
+      LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " +
           StringUtils.stringifyException(e));
-    } finally {
-      // We need to clean this out one way or another.
-      txnHandler.markCleaned(ci);
+      txnHandler.markFailed(ci);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 7d0f46a..07ac0c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -137,6 +137,10 @@ public class CompactorMR {
    */
   void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd,
            ValidTxnList txns, CompactionInfo ci, Worker.StatsUpdater su) throws IOException {
+
+    if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
+      throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
+    }
     JobConf job = createBaseJobConf(conf, jobName, t, sd, txns);
 
     // Figure out and encode what files we need to read.  We do this here (rather than in

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index a8fe57d..2ef06de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -78,7 +78,7 @@ public class Initiator extends CompactorThread {
 
         // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop
         // don't doom the entire thread.
-        try {
+        try {//todo: add method to only get current i.e. skip history - more efficient
           ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
           ValidTxnList txns =
               CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
@@ -119,6 +119,11 @@ public class Initiator extends CompactorThread {
                     ci.getFullPartitionName() + " so we will not initiate another compaction");
                 continue;
               }
+              if(txnHandler.checkFailedCompactions(ci)) {
+                //todo: make 'a' state entry in completed_compactions
+                LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last 3 attempts to compact it failed.");
+                continue;
+              }
 
               // Figure out who we should run the file operations as
               Partition p = resolvePartition(ci);
@@ -134,9 +139,9 @@ public class Initiator extends CompactorThread {
               if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
             } catch (Throwable t) {
               LOG.error("Caught exception while trying to determine if we should compact " +
-                  ci.getFullPartitionName() + ".  Marking clean to avoid repeated failures, " +
+                  ci + ".  Marking clean to avoid repeated failures, " +
                   "" + StringUtils.stringifyException(t));
-              txnHandler.markCleaned(ci);
+              txnHandler.markFailed(ci);
             }
           }
 
@@ -300,7 +305,7 @@ public class Initiator extends CompactorThread {
     if (ci.partName != null) rqst.setPartitionname(ci.partName);
     rqst.setRunas(runAs);
     LOG.info("Requesting compaction: " + rqst);
-    txnHandler.compact(rqst);
+    ci.id = txnHandler.compact(rqst);
   }
 
   // Because TABLE_NO_AUTO_COMPACT was originally assumed to be NO_AUTO_COMPACT and then was moved

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 045ce63..ce03c8e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
-import org.apache.hadoop.hive.metastore.txn.ValidCompactorTxnList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -70,7 +69,8 @@ public class Worker extends CompactorThread {
       throw new RuntimeException(e);
     }
   }
-
+//todo: this doesn;t check if compaction is already running (even though Initiator does but we
+// don't go  through Initiator for user initiated compactions)
   @Override
   public void run() {
     do {
@@ -174,9 +174,9 @@ public class Worker extends CompactorThread {
           }
           txnHandler.markCompacted(ci);
         } catch (Exception e) {
-          LOG.error("Caught exception while trying to compact " + ci.getFullPartitionName() +
+          LOG.error("Caught exception while trying to compact " + ci +
               ".  Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e));
-          txnHandler.markCleaned(ci);
+          txnHandler.markFailed(ci);
         }
       } catch (Throwable t) {
         LOG.error("Caught an exception in the main loop of compactor worker " + name + ", " +

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index b784585..7a1a3d2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -24,10 +24,21 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HouseKeeperService;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService;
 import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
+import org.apache.hadoop.hive.ql.txn.compactor.Initiator;
 import org.apache.hadoop.hive.ql.txn.compactor.Worker;
 import org.junit.After;
 import org.junit.Assert;
@@ -42,6 +53,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -454,6 +466,211 @@ public class TestTxnCommands2 {
     //insert overwrite not supported for ACID tables
   }
   /**
+   * HIVE-12353
+   * @throws Exception
+   */
+  @Test
+  public void testInitiatorWithMultipleFailedCompactions() throws Exception {
+    String tblName = "hive12353";
+    runStatementOnDriver("drop table if exists " + tblName);
+    runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+      " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
+      " STORED AS ORC  TBLPROPERTIES ('transactional'='true')");
+    hiveConf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 4);
+    for(int i = 0; i < 5; i++) {
+      //generate enough delta files so that Initiator can trigger auto compaction
+      runStatementOnDriver("insert into " + tblName + " values(" + (i + 1) + ", 'foo'),(" + (i + 2) + ", 'bar'),(" + (i + 3) + ", 'baz')");
+    }
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
+
+    int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+    CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
+    AtomicBoolean stop = new AtomicBoolean(true);
+    //create failed compactions
+    for(int i = 0; i < numFailedCompactions; i++) {
+      //each of these should fail
+      txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
+      runWorker(hiveConf);
+    }
+    //this should not schedule a new compaction due to prior failures
+    Initiator init = new Initiator();
+    init.setThreadId((int)init.getId());
+    init.setHiveConf(hiveConf);
+    init.init(stop, new AtomicBoolean());
+    init.run();
+
+    CompactionsByState cbs = countCompacts(txnHandler);
+    Assert.assertEquals("Unexpected number of failed compactions", numFailedCompactions, cbs.failed);
+    Assert.assertEquals("Unexpected total number of compactions", numFailedCompactions, cbs.total);
+    
+    hiveConf.setTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, 10, TimeUnit.MILLISECONDS);
+    AcidCompactionHistoryService compactionHistoryService = new AcidCompactionHistoryService();
+    runHouseKeeperService(compactionHistoryService, hiveConf);//should not remove anything from history
+    cbs = countCompacts(txnHandler);
+    Assert.assertEquals("Number of failed compactions after History clean", numFailedCompactions, cbs.failed);
+    Assert.assertEquals("Total number of compactions after History clean", numFailedCompactions, cbs.total);
+
+    txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MAJOR));
+    runWorker(hiveConf);//will fail
+    txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
+    runWorker(hiveConf);//will fail
+    cbs = countCompacts(txnHandler);
+    Assert.assertEquals("Unexpected num failed1", numFailedCompactions + 2, cbs.failed);
+    Assert.assertEquals("Unexpected num total1", numFailedCompactions + 2, cbs.total);
+    runHouseKeeperService(compactionHistoryService, hiveConf);//should remove history so that we have
+    //COMPACTOR_HISTORY_RETENTION_FAILED failed compacts left (and no other since we only have failed ones here)
+    cbs = countCompacts(txnHandler);
+    Assert.assertEquals("Unexpected num failed2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
+    Assert.assertEquals("Unexpected num total2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.total);
+    
+    
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false);
+    txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
+    //at this point "show compactions" should have (COMPACTOR_HISTORY_RETENTION_FAILED) failed + 1 initiated
+    cbs = countCompacts(txnHandler);
+    Assert.assertEquals("Unexpected num failed3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
+    Assert.assertEquals("Unexpected num initiated", 1, cbs.initiated);
+    Assert.assertEquals("Unexpected num total3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
+
+    runWorker(hiveConf);//will succeed and transition to Initiated->Working->Ready for Cleaning
+    cbs = countCompacts(txnHandler);
+    Assert.assertEquals("Unexpected num failed4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
+    Assert.assertEquals("Unexpected num ready to clean", 1, cbs.readyToClean);
+    Assert.assertEquals("Unexpected num total4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
+    
+    runCleaner(hiveConf); // transition to Success state
+    runHouseKeeperService(compactionHistoryService, hiveConf);//should not purge anything as all items within retention sizes
+    cbs = countCompacts(txnHandler);
+    Assert.assertEquals("Unexpected num failed5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
+    Assert.assertEquals("Unexpected num succeeded", 1, cbs.succeeded);
+    Assert.assertEquals("Unexpected num total5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
+  }
+  private static class CompactionsByState {
+    private int attempted;
+    private int failed;
+    private int initiated;
+    private int readyToClean;
+    private int succeeded;
+    private int working;
+    private int total;
+  }
+  private static CompactionsByState countCompacts(TxnHandler txnHandler) throws MetaException {
+    ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
+    CompactionsByState compactionsByState = new CompactionsByState();
+    compactionsByState.total = resp.getCompactsSize();
+    for(ShowCompactResponseElement compact : resp.getCompacts()) {
+      if(TxnHandler.FAILED_RESPONSE.equals(compact.getState())) {
+        compactionsByState.failed++;
+      }
+      else if(TxnHandler.CLEANING_RESPONSE.equals(compact.getState())) {
+        compactionsByState.readyToClean++;
+      }
+      else if(TxnHandler.INITIATED_RESPONSE.equals(compact.getState())) {
+        compactionsByState.initiated++;
+      }
+      else if(TxnHandler.SUCCEEDED_RESPONSE.equals(compact.getState())) {
+        compactionsByState.succeeded++;
+      }
+      else if(TxnHandler.WORKING_RESPONSE.equals(compact.getState())) {
+        compactionsByState.working++;
+      }
+      else if(TxnHandler.ATTEMPTED_RESPONSE.equals(compact.getState())) {
+        compactionsByState.attempted++;
+      }
+    }
+    return compactionsByState;
+  }
+  private static void runWorker(HiveConf hiveConf) throws MetaException {
+    AtomicBoolean stop = new AtomicBoolean(true);
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setHiveConf(hiveConf);
+    AtomicBoolean looped = new AtomicBoolean();
+    t.init(stop, looped);
+    t.run();
+  }
+  private static void runCleaner(HiveConf hiveConf) throws MetaException {
+    AtomicBoolean stop = new AtomicBoolean(true);
+    Cleaner t = new Cleaner();
+    t.setThreadId((int) t.getId());
+    t.setHiveConf(hiveConf);
+    AtomicBoolean looped = new AtomicBoolean();
+    t.init(stop, looped);
+    t.run();
+  }
+
+  private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
+    int lastCount = houseKeeperService.getIsAliveCounter();
+    houseKeeperService.start(conf);
+    while(houseKeeperService.getIsAliveCounter() <= lastCount) {
+      try {
+        Thread.sleep(100);//make sure it has run at least once
+      }
+      catch(InterruptedException ex) {
+        //...
+      }
+    }
+    houseKeeperService.stop();
+  }
+
+  /**
+   * HIVE-12352 has details
+   * @throws Exception
+   */
+  @Test
+  public void writeBetweenWorkerAndCleaner() throws Exception {
+    String tblName = "hive12352";
+    runStatementOnDriver("drop table if exists " + tblName);
+    runStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+      " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed
+      " STORED AS ORC  TBLPROPERTIES ('transactional'='true')");
+
+    //create some data
+    runStatementOnDriver("insert into " + tblName + " values(1, 'foo'),(2, 'bar'),(3, 'baz')");
+    runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");
+
+    //run Worker to execute compaction
+    CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
+    txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setHiveConf(hiveConf);
+    AtomicBoolean stop = new AtomicBoolean(true);
+    AtomicBoolean looped = new AtomicBoolean();
+    t.init(stop, looped);
+    t.run();
+
+    //delete something, but make sure txn is rolled back
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+    runStatementOnDriver("delete from " + tblName + " where a = 1");
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+
+    List<String> expected = new ArrayList<>();
+    expected.add("1\tfoo");
+    expected.add("2\tbar");
+    expected.add("3\tblah");
+    Assert.assertEquals("", expected,
+      runStatementOnDriver("select a,b from " + tblName + " order by a"));
+
+    //run Cleaner
+    Cleaner c = new Cleaner();
+    c.setThreadId((int)c.getId());
+    c.setHiveConf(hiveConf);
+    c.init(stop, new AtomicBoolean());
+    c.run();
+
+    //this seems odd, but we wan to make sure that to run CompactionTxnHandler.cleanEmptyAbortedTxns()
+    Initiator i = new Initiator();
+    i.setThreadId((int)i.getId());
+    i.setHiveConf(hiveConf);
+    i.init(stop, new AtomicBoolean());
+    i.run();
+
+    //check that aborted operation didn't become committed
+    Assert.assertEquals("", expected,
+      runStatementOnDriver("select a,b from " + tblName + " order by a"));
+  }
+  /**
    * takes raw data and turns it into a string as if from Driver.getResults()
    * sorts rows in dictionary order
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index bca5002..899f5a1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,7 +72,8 @@ public class TestCleaner extends CompactorTest {
 
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    Assert.assertEquals(0, rsp.getCompactsSize());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
 
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, null);
@@ -102,7 +104,8 @@ public class TestCleaner extends CompactorTest {
 
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    Assert.assertEquals(0, rsp.getCompactsSize());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
 
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, p);
@@ -131,7 +134,8 @@ public class TestCleaner extends CompactorTest {
 
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    Assert.assertEquals(0, rsp.getCompactsSize());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
 
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, null);
@@ -169,7 +173,8 @@ public class TestCleaner extends CompactorTest {
 
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    Assert.assertEquals(0, rsp.getCompactsSize());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
 
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, p);
@@ -323,7 +328,8 @@ public class TestCleaner extends CompactorTest {
     // Check there are no compactions requests left.
     rsp = txnHandler.showCompact(new ShowCompactRequest());
     compacts = rsp.getCompacts();
-    Assert.assertEquals(0, compacts.size());
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
   }
 
   @Test
@@ -396,7 +402,8 @@ public class TestCleaner extends CompactorTest {
     // Check there are no compactions requests left.
     rsp = txnHandler.showCompact(new ShowCompactRequest());
     compacts = rsp.getCompacts();
-    Assert.assertEquals(0, compacts.size());
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
   }
 
   @Test
@@ -421,7 +428,8 @@ public class TestCleaner extends CompactorTest {
 
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    Assert.assertEquals(0, rsp.getCompactsSize());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
 
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, p);
@@ -451,7 +459,8 @@ public class TestCleaner extends CompactorTest {
 
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    Assert.assertEquals(0, rsp.getCompactsSize());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
   }
 
   @Test
@@ -478,7 +487,8 @@ public class TestCleaner extends CompactorTest {
 
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    Assert.assertEquals(0, rsp.getCompactsSize());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
   }
   @Override
   boolean useHive130DeltaDirName() {

http://git-wip-us.apache.org/repos/asf/hive/blob/e8388ae6/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index fe1d0d3..d0db406 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.*;
@@ -933,7 +934,8 @@ public class TestWorker extends CompactorTest {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(0, compacts.size());
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(compacts.get(0).getState()));
   }
 
   @Test
@@ -957,6 +959,7 @@ public class TestWorker extends CompactorTest {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(0, compacts.size());
+    Assert.assertEquals(1, compacts.size());
+    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
   }
 }


Mime
View raw message