hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ekoif...@apache.org
Subject svn commit: r1654443 [2/2] - in /hive/branches/branch-1.0/metastore/src: java/org/apache/hadoop/hive/metastore/ java/org/apache/hadoop/hive/metastore/txn/ test/org/apache/hadoop/hive/metastore/txn/
Date Sat, 24 Jan 2015 01:05:22 GMT
Modified: hive/branches/branch-1.0/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.0/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1654443&r1=1654442&r2=1654443&view=diff
==============================================================================
--- hive/branches/branch-1.0/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original)
+++ hive/branches/branch-1.0/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Sat Jan 24 01:05:22 2015
@@ -77,7 +77,7 @@ public class TxnHandler {
   static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName());
 
   static private DataSource connPool;
-  private static Boolean lockLock = new Boolean("true"); // Random object to lock on for the lock
+  private final static Object lockLock = new Object(); // Random object to lock on for the lock
   // method
 
   /**
@@ -87,10 +87,13 @@ public class TxnHandler {
   protected HiveConf conf;
   protected DatabaseProduct dbProduct;
 
-  // Transaction timeout, in milliseconds.
+  // (End user) Transaction timeout, in milliseconds.
   private long timeout;
 
   private String identifierQuoteString; // quotes to use for quoting tables, where necessary
+  private final long retryInterval;
+  private final int retryLimit;
+  private int retryNum;
 
   // DEADLOCK DETECTION AND HANDLING
   // A note to developers of this class.  ALWAYS access HIVE_LOCKS before TXNS to avoid deadlock
@@ -125,113 +128,122 @@ public class TxnHandler {
     timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
     deadlockCnt = 0;
     buildJumpTable();
+    retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL, TimeUnit.MILLISECONDS);
+    retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
+
   }
 
   public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
-    // We need to figure out the current transaction number and the list of
-    // open transactions.  To avoid needing a transaction on the underlying
-    // database we'll look at the current transaction number first.  If it
-    // subsequently shows up in the open list that's ok.
-    Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-    Statement stmt = null;
     try {
-      stmt = dbConn.createStatement();
-      String s = "select ntxn_next - 1 from NEXT_TXN_ID";
-      LOG.debug("Going to execute query <" + s + ">");
-      ResultSet rs = stmt.executeQuery(s);
-      if (!rs.next()) {
-        throw new MetaException("Transaction tables not properly " +
+      // We need to figure out the current transaction number and the list of
+      // open transactions.  To avoid needing a transaction on the underlying
+      // database we'll look at the current transaction number first.  If it
+      // subsequently shows up in the open list that's ok.
+      Connection dbConn = null;
+      Statement stmt = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String s = "select ntxn_next - 1 from NEXT_TXN_ID";
+        LOG.debug("Going to execute query <" + s + ">");
+        ResultSet rs = stmt.executeQuery(s);
+        if (!rs.next()) {
+          throw new MetaException("Transaction tables not properly " +
             "initialized, no record found in next_txn_id");
-      }
-      long hwm = rs.getLong(1);
-      if (rs.wasNull()) {
-        throw new MetaException("Transaction tables not properly " +
+        }
+        long hwm = rs.getLong(1);
+        if (rs.wasNull()) {
+          throw new MetaException("Transaction tables not properly " +
             "initialized, null record found in next_txn_id");
-      }
-
-      List<TxnInfo> txnInfo = new ArrayList<TxnInfo>();
-      s = "select txn_id, txn_state, txn_user, txn_host from TXNS";
-      LOG.debug("Going to execute query<" + s + ">");
-      rs = stmt.executeQuery(s);
-      while (rs.next()) {
-        char c = rs.getString(2).charAt(0);
-        TxnState state;
-        switch (c) {
-          case TXN_ABORTED:
-            state = TxnState.ABORTED;
-            break;
+        }
 
-          case TXN_OPEN:
-            state = TxnState.OPEN;
-            break;
+        List<TxnInfo> txnInfo = new ArrayList<TxnInfo>();
+        s = "select txn_id, txn_state, txn_user, txn_host from TXNS";
+        LOG.debug("Going to execute query<" + s + ">");
+        rs = stmt.executeQuery(s);
+        while (rs.next()) {
+          char c = rs.getString(2).charAt(0);
+          TxnState state;
+          switch (c) {
+            case TXN_ABORTED:
+              state = TxnState.ABORTED;
+              break;
+
+            case TXN_OPEN:
+              state = TxnState.OPEN;
+              break;
 
-          default:
-            throw new MetaException("Unexpected transaction state " + c +
+            default:
+              throw new MetaException("Unexpected transaction state " + c +
                 " found in txns table");
+          }
+          txnInfo.add(new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4)));
         }
-        txnInfo.add(new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4)));
-      }
-      LOG.debug("Going to rollback");
-      dbConn.rollback();
-      return new GetOpenTxnsInfoResponse(hwm, txnInfo);
-    } catch (SQLException e) {
-      try {
         LOG.debug("Going to rollback");
         dbConn.rollback();
-      } catch (SQLException e1) {
-      }
-      throw new MetaException("Unable to select from transaction database, "
+        return new GetOpenTxnsInfoResponse(hwm, txnInfo);
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "getOpenTxnsInfo");
+        throw new MetaException("Unable to select from transaction database: " + getMessage(e)
           + StringUtils.stringifyException(e));
-    } finally {
-      closeStmt(stmt);
-      closeDbConn(dbConn);
+      } finally {
+        closeStmt(stmt);
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getOpenTxnsInfo();
     }
   }
 
   public GetOpenTxnsResponse getOpenTxns() throws MetaException {
-    // We need to figure out the current transaction number and the list of
-    // open transactions.  To avoid needing a transaction on the underlying
-    // database we'll look at the current transaction number first.  If it
-    // subsequently shows up in the open list that's ok.
-    Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-    Statement stmt = null;
     try {
-      timeOutTxns(dbConn);
-      stmt = dbConn.createStatement();
-      String s = "select ntxn_next - 1 from NEXT_TXN_ID";
-      LOG.debug("Going to execute query <" + s + ">");
-      ResultSet rs = stmt.executeQuery(s);
-      if (!rs.next()) {
-        throw new MetaException("Transaction tables not properly " +
+      // We need to figure out the current transaction number and the list of
+      // open transactions.  To avoid needing a transaction on the underlying
+      // database we'll look at the current transaction number first.  If it
+      // subsequently shows up in the open list that's ok.
+      Connection dbConn = null;
+      Statement stmt = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        timeOutTxns(dbConn);
+        stmt = dbConn.createStatement();
+        String s = "select ntxn_next - 1 from NEXT_TXN_ID";
+        LOG.debug("Going to execute query <" + s + ">");
+        ResultSet rs = stmt.executeQuery(s);
+        if (!rs.next()) {
+          throw new MetaException("Transaction tables not properly " +
             "initialized, no record found in next_txn_id");
-      }
-      long hwm = rs.getLong(1);
-      if (rs.wasNull()) {
-        throw new MetaException("Transaction tables not properly " +
+        }
+        long hwm = rs.getLong(1);
+        if (rs.wasNull()) {
+          throw new MetaException("Transaction tables not properly " +
             "initialized, null record found in next_txn_id");
-      }
+        }
 
-      Set<Long> openList = new HashSet<Long>();
-      s = "select txn_id from TXNS";
-      LOG.debug("Going to execute query<" + s + ">");
-      rs = stmt.executeQuery(s);
-      while (rs.next()) {
-        openList.add(rs.getLong(1));
-      }
-      LOG.debug("Going to rollback");
-      dbConn.rollback();
-      return new GetOpenTxnsResponse(hwm, openList);
-    } catch (SQLException e) {
-      try {
+        Set<Long> openList = new HashSet<Long>();
+        s = "select txn_id from TXNS";
+        LOG.debug("Going to execute query<" + s + ">");
+        rs = stmt.executeQuery(s);
+        while (rs.next()) {
+          openList.add(rs.getLong(1));
+        }
         LOG.debug("Going to rollback");
         dbConn.rollback();
-      } catch (SQLException e1) {
-      }
-      throw new MetaException("Unable to select from transaction database, "
+        return new GetOpenTxnsResponse(hwm, openList);
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "getOpenTxns");
+        throw new MetaException("Unable to select from transaction database, "
           + StringUtils.stringifyException(e));
-    } finally {
-      closeStmt(stmt);
-      closeDbConn(dbConn);
+      } finally {
+        closeStmt(stmt);
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      return getOpenTxns();
     }
   }
 
@@ -259,12 +271,13 @@ public class TxnHandler {
   public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
     int numTxns = rqst.getNum_txns();
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       Statement stmt = null;
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         // Make sure the user has not requested an insane amount of txns.
         int maxTxns = HiveConf.getIntVar(conf,
-            HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH);
+          HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH);
         if (numTxns > maxTxns) numTxns = maxTxns;
 
         stmt = dbConn.createStatement();
@@ -273,7 +286,7 @@ public class TxnHandler {
         ResultSet rs = stmt.executeQuery(s);
         if (!rs.next()) {
           throw new MetaException("Transaction database not properly " +
-              "configured, can't find next transaction id.");
+            "configured, can't find next transaction id.");
         }
         long first = rs.getLong(1);
         s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
@@ -281,8 +294,8 @@ public class TxnHandler {
         stmt.executeUpdate(s);
         long now = getDbTime(dbConn);
         s = "insert into TXNS (txn_id, txn_state, txn_started, " +
-            "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + ", " +
-            now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')";
+          "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + ", " +
+          now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')";
         LOG.debug("Going to prepare statement <" + s + ">");
         PreparedStatement ps = dbConn.prepareStatement(s);
         List<Long> txnIds = new ArrayList<Long>(numTxns);
@@ -296,30 +309,26 @@ public class TxnHandler {
         dbConn.commit();
         return new OpenTxnsResponse(txnIds);
       } catch (SQLException e) {
-        try {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "openTxns");
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "openTxns");
         throw new MetaException("Unable to select from transaction database "
           + StringUtils.stringifyException(e));
       } finally {
         closeStmt(stmt);
         closeDbConn(dbConn);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       return openTxns(rqst);
-    } finally {
-      deadlockCnt = 0;
     }
   }
 
   public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException {
     long txnid = rqst.getTxnid();
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         List<Long> txnids = new ArrayList<Long>(1);
         txnids.add(txnid);
         if (abortTxns(dbConn, txnids) != 1) {
@@ -331,31 +340,27 @@ public class TxnHandler {
         LOG.debug("Going to commit");
         dbConn.commit();
       } catch (SQLException e) {
-        try {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "abortTxn");
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "abortTxn");
         throw new MetaException("Unable to update transaction database "
           + StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       abortTxn(rqst);
-    } finally {
-      deadlockCnt = 0;
     }
   }
 
   public void commitTxn(CommitTxnRequest rqst)
-      throws NoSuchTxnException, TxnAbortedException,  MetaException {
+    throws NoSuchTxnException, TxnAbortedException,  MetaException {
     long txnid = rqst.getTxnid();
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       Statement stmt = null;
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         stmt = dbConn.createStatement();
         // Before we do the commit heartbeat the txn.  This is slightly odd in that we're going to
         // commit it, but it does two things.  One, it makes sure the transaction is still valid.
@@ -367,11 +372,11 @@ public class TxnHandler {
         // Move the record from txn_components into completed_txn_components so that the compactor
         // knows where to look to compact.
         String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " +
-            "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
+          "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
         LOG.debug("Going to execute insert <" + s + ">");
         if (stmt.executeUpdate(s) < 1) {
           LOG.warn("Expected to move at least one record from txn_components to " +
-              "completed_txn_components when committing txn!");
+            "completed_txn_components when committing txn!");
         }
 
         // Always access TXN_COMPONENTS before HIVE_LOCKS;
@@ -388,80 +393,68 @@ public class TxnHandler {
         LOG.debug("Going to commit");
         dbConn.commit();
       } catch (SQLException e) {
-        try {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "commitTxn");
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "commitTxn");
         throw new MetaException("Unable to update transaction database "
           + StringUtils.stringifyException(e));
       } finally {
         closeStmt(stmt);
         closeDbConn(dbConn);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       commitTxn(rqst);
-    } finally {
-      deadlockCnt = 0;
     }
   }
 
   public LockResponse lock(LockRequest rqst)
-      throws NoSuchTxnException, TxnAbortedException, MetaException {
+    throws NoSuchTxnException, TxnAbortedException, MetaException {
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         return lock(dbConn, rqst, true);
       } catch (SQLException e) {
-        try {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "lock");
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "lock");
         throw new MetaException("Unable to update transaction database " +
-            StringUtils.stringifyException(e));
+          StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       return lock(rqst);
-    } finally {
-      deadlockCnt = 0;
     }
   }
 
   public LockResponse lockNoWait(LockRequest rqst)
-      throws NoSuchTxnException,  TxnAbortedException, MetaException {
+    throws NoSuchTxnException,  TxnAbortedException, MetaException {
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         return lock(dbConn, rqst, false);
       } catch (SQLException e) {
-        try {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "lockNoWait");
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "lockNoWait");
         throw new MetaException("Unable to update transaction database " +
-            StringUtils.stringifyException(e));
+          StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       return lockNoWait(rqst);
-    } finally {
-      deadlockCnt = 0;
     }
   }
 
   public LockResponse checkLock(CheckLockRequest rqst)
-      throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
+    throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         long extLockId = rqst.getLockid();
         // Clean up timed out locks
         timeOutLocks(dbConn);
@@ -474,31 +467,27 @@ public class TxnHandler {
         if (txnid > 0)  heartbeatTxn(dbConn, txnid);
         return checkLock(dbConn, extLockId, true);
       } catch (SQLException e) {
-        try {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "checkLock");
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "checkLock");
         throw new MetaException("Unable to update transaction database " +
-            StringUtils.stringifyException(e));
+          StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       return checkLock(rqst);
-    } finally {
-      deadlockCnt = 0;
     }
 
   }
 
   public void unlock(UnlockRequest rqst)
-      throws NoSuchLockException, TxnOpenException, MetaException {
+    throws NoSuchLockException, TxnOpenException, MetaException {
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       Statement stmt = null;
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         // Odd as it seems, we need to heartbeat first because this touches the
         // lock table and assures that our locks our still valid.  If they are
         // not, this will throw an exception and the heartbeat will fail.
@@ -512,8 +501,8 @@ public class TxnHandler {
           LOG.debug("Going to rollback");
           dbConn.rollback();
           String msg = "Unlocking locks associated with transaction" +
-              " not permitted.  Lockid " + extLockId + " is associated with " +
-              "transaction " + txnid;
+            " not permitted.  Lockid " + extLockId + " is associated with " +
+            "transaction " + txnid;
           LOG.error(msg);
           throw new TxnOpenException(msg);
         }
@@ -529,97 +518,96 @@ public class TxnHandler {
         LOG.debug("Going to commit");
         dbConn.commit();
       } catch (SQLException e) {
-        try {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "unlock");
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "unlock");
         throw new MetaException("Unable to update transaction database " +
-            StringUtils.stringifyException(e));
+          StringUtils.stringifyException(e));
       } finally {
         closeStmt(stmt);
         closeDbConn(dbConn);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       unlock(rqst);
-    } finally {
-      deadlockCnt = 0;
     }
   }
 
   public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException {
-    Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-    ShowLocksResponse rsp = new ShowLocksResponse();
-    List<ShowLocksResponseElement> elems = new ArrayList<ShowLocksResponseElement>();
-    Statement stmt = null;
     try {
-      stmt = dbConn.createStatement();
+      Connection dbConn = null;
+      ShowLocksResponse rsp = new ShowLocksResponse();
+      List<ShowLocksResponseElement> elems = new ArrayList<ShowLocksResponseElement>();
+      Statement stmt = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
 
-      String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
+        String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
           "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host from HIVE_LOCKS";
-      LOG.debug("Doing to execute query <" + s + ">");
-      ResultSet rs = stmt.executeQuery(s);
-      while (rs.next()) {
-        ShowLocksResponseElement e = new ShowLocksResponseElement();
-        e.setLockid(rs.getLong(1));
-        long txnid = rs.getLong(2);
-        if (!rs.wasNull()) e.setTxnid(txnid);
-        e.setDbname(rs.getString(3));
-        e.setTablename(rs.getString(4));
-        String partition = rs.getString(5);
-        if (partition != null) e.setPartname(partition);
-        switch (rs.getString(6).charAt(0)) {
-          case LOCK_ACQUIRED: e.setState(LockState.ACQUIRED); break;
-          case LOCK_WAITING: e.setState(LockState.WAITING); break;
-          default: throw new MetaException("Unknown lock state " + rs.getString(6).charAt(0));
-        }
-        switch (rs.getString(7).charAt(0)) {
-          case LOCK_SEMI_SHARED: e.setType(LockType.SHARED_WRITE); break;
-          case LOCK_EXCLUSIVE: e.setType(LockType.EXCLUSIVE); break;
-          case LOCK_SHARED: e.setType(LockType.SHARED_READ); break;
-          default: throw new MetaException("Unknown lock type " + rs.getString(6).charAt(0));
-        }
-        e.setLastheartbeat(rs.getLong(8));
-        long acquiredAt = rs.getLong(9);
-        if (!rs.wasNull()) e.setAcquiredat(acquiredAt);
-        e.setUser(rs.getString(10));
-        e.setHostname(rs.getString(11));
-        elems.add(e);
-      }
-      LOG.debug("Going to rollback");
-      dbConn.rollback();
-    } catch (SQLException e) {
-      throw new MetaException("Unable to select from transaction database " +
+        LOG.debug("Doing to execute query <" + s + ">");
+        ResultSet rs = stmt.executeQuery(s);
+        while (rs.next()) {
+          ShowLocksResponseElement e = new ShowLocksResponseElement();
+          e.setLockid(rs.getLong(1));
+          long txnid = rs.getLong(2);
+          if (!rs.wasNull()) e.setTxnid(txnid);
+          e.setDbname(rs.getString(3));
+          e.setTablename(rs.getString(4));
+          String partition = rs.getString(5);
+          if (partition != null) e.setPartname(partition);
+          switch (rs.getString(6).charAt(0)) {
+            case LOCK_ACQUIRED: e.setState(LockState.ACQUIRED); break;
+            case LOCK_WAITING: e.setState(LockState.WAITING); break;
+            default: throw new MetaException("Unknown lock state " + rs.getString(6).charAt(0));
+          }
+          switch (rs.getString(7).charAt(0)) {
+            case LOCK_SEMI_SHARED: e.setType(LockType.SHARED_WRITE); break;
+            case LOCK_EXCLUSIVE: e.setType(LockType.EXCLUSIVE); break;
+            case LOCK_SHARED: e.setType(LockType.SHARED_READ); break;
+            default: throw new MetaException("Unknown lock type " + rs.getString(6).charAt(0));
+          }
+          e.setLastheartbeat(rs.getLong(8));
+          long acquiredAt = rs.getLong(9);
+          if (!rs.wasNull()) e.setAcquiredat(acquiredAt);
+          e.setUser(rs.getString(10));
+          e.setHostname(rs.getString(11));
+          elems.add(e);
+        }
+        LOG.debug("Going to rollback");
+        dbConn.rollback();
+      } catch (SQLException e) {
+        checkRetryable(dbConn, e, "showLocks");
+        throw new MetaException("Unable to select from transaction database " +
           StringUtils.stringifyException(e));
-    } finally {
-      closeStmt(stmt);
-      closeDbConn(dbConn);
+      } finally {
+        closeStmt(stmt);
+        closeDbConn(dbConn);
+      }
+      rsp.setLocks(elems);
+      return rsp;
+    } catch (RetryException e) {
+      return showLocks(rqst);
     }
-    rsp.setLocks(elems);
-    return rsp;
   }
 
   public void heartbeat(HeartbeatRequest ids)
-      throws NoSuchTxnException,  NoSuchLockException, TxnAbortedException, MetaException {
+    throws NoSuchTxnException,  NoSuchLockException, TxnAbortedException, MetaException {
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         heartbeatLock(dbConn, ids.getLockid());
         heartbeatTxn(dbConn, ids.getTxnid());
       } catch (SQLException e) {
-        try {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "heartbeat");
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "heartbeat");
         throw new MetaException("Unable to select from transaction database " +
-            StringUtils.stringifyException(e));
+          StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       heartbeat(ids);
     } finally {
       deadlockCnt = 0;
@@ -627,15 +615,16 @@ public class TxnHandler {
   }
 
   public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
-      throws MetaException {
+    throws MetaException {
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       HeartbeatTxnRangeResponse rsp = new HeartbeatTxnRangeResponse();
       Set<Long> nosuch = new HashSet<Long>();
       Set<Long> aborted = new HashSet<Long>();
       rsp.setNosuch(nosuch);
       rsp.setAborted(aborted);
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) {
           try {
             heartbeatTxn(dbConn, txn);
@@ -647,18 +636,15 @@ public class TxnHandler {
         }
         return rsp;
       } catch (SQLException e) {
-        try {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "heartbeatTxnRange");
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "heartbeatTxnRange");
         throw new MetaException("Unable to select from transaction database " +
-            StringUtils.stringifyException(e));
+          StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       return heartbeatTxnRange(rqst);
     }
   }
@@ -666,9 +652,10 @@ public class TxnHandler {
   public void compact(CompactionRequest rqst) throws MetaException {
     // Put a compaction request in the queue.
     try {
-      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Connection dbConn = null;
       Statement stmt = null;
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
         stmt = dbConn.createStatement();
 
         // Get the id for the next entry in the queue
@@ -679,7 +666,7 @@ public class TxnHandler {
           LOG.debug("Going to rollback");
           dbConn.rollback();
           throw new MetaException("Transaction tables not properly initiated, " +
-              "no record found in next_compaction_queue_id");
+            "no record found in next_compaction_queue_id");
         }
         long id = rs.getLong(1);
         s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1);
@@ -687,7 +674,7 @@ public class TxnHandler {
         stmt.executeUpdate(s);
 
         StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " +
-            "cq_table, ");
+          "cq_table, ");
         String partName = rqst.getPartitionname();
         if (partName != null) buf.append("cq_partition, ");
         buf.append("cq_state, cq_type");
@@ -730,71 +717,69 @@ public class TxnHandler {
         LOG.debug("Going to commit");
         dbConn.commit();
       } catch (SQLException e) {
-        try {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        } catch (SQLException e1) {
-        }
-        detectDeadlock(dbConn, e, "compact");
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "compact");
         throw new MetaException("Unable to select from transaction database " +
-            StringUtils.stringifyException(e));
+          StringUtils.stringifyException(e));
       } finally {
         closeStmt(stmt);
         closeDbConn(dbConn);
       }
-    } catch (DeadlockException e) {
+    } catch (RetryException e) {
       compact(rqst);
-    } finally {
-      deadlockCnt = 0;
     }
   }
 
   public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException {
     ShowCompactResponse response = new ShowCompactResponse(new ArrayList<ShowCompactResponseElement>());
-    Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+    Connection dbConn = null;
     Statement stmt = null;
     try {
-      stmt = dbConn.createStatement();
-      String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
-          "cq_start, cq_run_as from COMPACTION_QUEUE";
-      LOG.debug("Going to execute query <" + s + ">");
-      ResultSet rs = stmt.executeQuery(s);
-      while (rs.next()) {
-        ShowCompactResponseElement e = new ShowCompactResponseElement();
-        e.setDbname(rs.getString(1));
-        e.setTablename(rs.getString(2));
-        e.setPartitionname(rs.getString(3));
-        switch (rs.getString(4).charAt(0)) {
-          case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break;
-          case WORKING_STATE: e.setState(WORKING_RESPONSE); break;
-          case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break;
-          default: throw new MetaException("Unexpected compaction state " + rs.getString(4));
-        }
-        switch (rs.getString(5).charAt(0)) {
-          case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break;
-          case MINOR_TYPE: e.setType(CompactionType.MINOR); break;
-          default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
-        }
-        e.setWorkerid(rs.getString(6));
-        e.setStart(rs.getLong(7));
-        e.setRunAs(rs.getString(8));
-        response.addToCompacts(e);
-      }
-      LOG.debug("Going to rollback");
-      dbConn.rollback();
-    } catch (SQLException e) {
-      LOG.debug("Going to rollback");
       try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
+          "cq_start, cq_run_as from COMPACTION_QUEUE";
+        LOG.debug("Going to execute query <" + s + ">");
+        ResultSet rs = stmt.executeQuery(s);
+        while (rs.next()) {
+          ShowCompactResponseElement e = new ShowCompactResponseElement();
+          e.setDbname(rs.getString(1));
+          e.setTablename(rs.getString(2));
+          e.setPartitionname(rs.getString(3));
+          switch (rs.getString(4).charAt(0)) {
+            case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break;
+            case WORKING_STATE: e.setState(WORKING_RESPONSE); break;
+            case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break;
+            default: throw new MetaException("Unexpected compaction state " + rs.getString(4));
+          }
+          switch (rs.getString(5).charAt(0)) {
+            case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break;
+            case MINOR_TYPE: e.setType(CompactionType.MINOR); break;
+            default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
+          }
+          e.setWorkerid(rs.getString(6));
+          e.setStart(rs.getLong(7));
+          e.setRunAs(rs.getString(8));
+          response.addToCompacts(e);
+        }
+        LOG.debug("Going to rollback");
         dbConn.rollback();
-      } catch (SQLException e1) {
-      }
-      throw new MetaException("Unable to select from transaction database " +
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "showCompact");
+        throw new MetaException("Unable to select from transaction database " +
           StringUtils.stringifyException(e));
-    } finally {
-      closeStmt(stmt);
-      closeDbConn(dbConn);
+      } finally {
+        closeStmt(stmt);
+        closeDbConn(dbConn);
+      }
+      return response;
+    } catch (RetryException e) {
+      return showCompact(rqst);
     }
-    return response;
   }
 
   /**
@@ -828,7 +813,7 @@ public class TxnHandler {
     return previous_timeout;
   }
 
-  protected class DeadlockException extends Exception {
+  protected class RetryException extends Exception {
 
   }
 
@@ -839,26 +824,28 @@ public class TxnHandler {
    * @return db connection
    * @throws MetaException if the connection cannot be obtained
    */
-  protected Connection getDbConn(int isolationLevel) throws MetaException {
+  protected Connection getDbConn(int isolationLevel) throws SQLException {
+    Connection dbConn = connPool.getConnection();
+    dbConn.setAutoCommit(false);
+    dbConn.setTransactionIsolation(isolationLevel);
+    return dbConn;
+  }
+
+  void rollbackDBConn(Connection dbConn) {
     try {
-      Connection dbConn = connPool.getConnection();
-      dbConn.setAutoCommit(false);
-      dbConn.setTransactionIsolation(isolationLevel);
-      return dbConn;
+      if (dbConn != null) dbConn.rollback();
     } catch (SQLException e) {
-      String msg = "Unable to get jdbc connection from pool, " + e.getMessage();
-      throw new MetaException(msg);
+      LOG.warn("Failed to rollback db connection " + getMessage(e));
     }
   }
-
   protected void closeDbConn(Connection dbConn) {
     try {
       if (dbConn != null) dbConn.close();
     } catch (SQLException e) {
-      LOG.warn("Failed to close db connection " + e.getMessage());
+      LOG.warn("Failed to close db connection " + getMessage(e));
     }
   }
-  
+
   /**
    * Close statement instance.
    * @param stmt statement instance.
@@ -867,7 +854,7 @@ public class TxnHandler {
     try {
       if (stmt != null) stmt.close();
     } catch (SQLException e) {
-      LOG.warn("Failed to close statement " + e.getMessage());
+      LOG.warn("Failed to close statement " + getMessage(e));
     }
   }
 
@@ -882,7 +869,7 @@ public class TxnHandler {
       }
     }
     catch(SQLException ex) {
-      LOG.warn("Failed to close statement " + ex.getMessage());
+      LOG.warn("Failed to close statement " + getMessage(ex));
     }
   }
 
@@ -895,18 +882,18 @@ public class TxnHandler {
     closeDbConn(dbConn);
   }
   /**
-   * Determine if an exception was a deadlock.  Unfortunately there is no standard way to do
+   * Determine if an exception was such that it makse sense to retry.  Unfortunately there is no standard way to do
    * this, so we have to inspect the error messages and catch the telltale signs for each
    * different database.
    * @param conn database connection
    * @param e exception that was thrown.
    * @param caller name of the method calling this
-   * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.DeadlockException when deadlock
+   * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.RetryException when deadlock
    * detected and retry count has not been exceeded.
    */
-  protected void detectDeadlock(Connection conn,
+  protected void checkRetryable(Connection conn,
                                 SQLException e,
-                                String caller) throws DeadlockException, MetaException {
+                                String caller) throws RetryException, MetaException {
 
     // If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected()
     // to test these changes.
@@ -919,19 +906,41 @@ public class TxnHandler {
       determineDatabaseProduct(conn);
     }
     if (e instanceof SQLTransactionRollbackException ||
-        ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES ||
-            dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) ||
-        (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) ||
-        (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected")
-            || e.getMessage().contains("can't serialize access for this transaction")))) {
+      ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES ||
+        dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) ||
+      (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) ||
+      (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected")
+        || e.getMessage().contains("can't serialize access for this transaction")))) {
       if (deadlockCnt++ < ALLOWED_REPEATED_DEADLOCKS) {
         LOG.warn("Deadlock detected in " + caller + ", trying again.");
-        throw new DeadlockException();
+        throw new RetryException();
       } else {
         LOG.error("Too many repeated deadlocks in " + caller + ", giving up.");
         deadlockCnt = 0;
       }
     }
+    else if(isRetryable(e)) {
+      //in MSSQL this means Communication Link Failure
+      if(retryNum++ < retryLimit) {
+        try {
+          Thread.sleep(retryInterval);
+        }
+        catch(InterruptedException ex) {
+          //
+        }
+        LOG.warn("Retryable error detected in " + caller + ", trying again: " + getMessage(e));
+        throw new RetryException();
+      }
+      else {
+        LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
+        retryNum = 0;
+      }
+    }
+    else {
+      //if here, we got something that will propagate the error (rather than retry), so reset counters
+      deadlockCnt = 0;
+      retryNum = 0;
+    }
   }
 
   /**
@@ -1073,10 +1082,10 @@ public class TxnHandler {
     @Override
     public String toString() {
       return "extLockId:" + Long.toString(extLockId) + " intLockId:" +
-          intLockId + " txnId:" + Long.toString
-          (txnId) + " db:" + db + " table:" + table + " partition:" +
-          partition + " state:" + (state == null ? "null" : state.toString())
-          + " type:" + (type == null ? "null" : type.toString());
+        intLockId + " txnId:" + Long.toString
+        (txnId) + " db:" + db + " table:" + table + " partition:" +
+        partition + " state:" + (state == null ? "null" : state.toString())
+        + " type:" + (type == null ? "null" : type.toString());
     }
   }
 
@@ -1088,11 +1097,11 @@ public class TxnHandler {
     public int compare(LockInfo info1, LockInfo info2) {
       // We sort by state (acquired vs waiting) and then by extLockId.
       if (info1.state == LockState.ACQUIRED &&
-          info2.state != LockState .ACQUIRED) {
+        info2.state != LockState .ACQUIRED) {
         return -1;
       }
       if (info1.state != LockState.ACQUIRED &&
-          info2.state == LockState .ACQUIRED) {
+        info2.state == LockState .ACQUIRED) {
         return 1;
       }
       if (info1.extLockId < info2.extLockId) {
@@ -1124,7 +1133,7 @@ public class TxnHandler {
 
   private void checkQFileTestHack() {
     boolean hackOn = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST) ||
-        HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST);
+      HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST);
     if (hackOn) {
       LOG.info("Hacking in canned values for transaction manager");
       // Set up the transaction/locking db in the derby metastore
@@ -1135,7 +1144,7 @@ public class TxnHandler {
         // We may have already created the tables and thus don't need to redo it.
         if (!e.getMessage().contains("already exists")) {
           throw new RuntimeException("Unable to set up transaction database for" +
-              " testing: " + e.getMessage());
+            " testing: " + e.getMessage());
         }
       }
     }
@@ -1153,7 +1162,7 @@ public class TxnHandler {
     int updateCnt = 0;
     try {
       stmt = dbConn.createStatement();
-  
+
       // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS
       StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in (");
       boolean first = true;
@@ -1165,7 +1174,7 @@ public class TxnHandler {
       buf.append(')');
       LOG.debug("Going to execute update <" + buf.toString() + ">");
       stmt.executeUpdate(buf.toString());
-  
+
       buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id in (");
       first = true;
       for (Long id : txnids) {
@@ -1176,7 +1185,7 @@ public class TxnHandler {
       buf.append(')');
       LOG.debug("Going to execute update <" + buf.toString() + ">");
       updateCnt = stmt.executeUpdate(buf.toString());
-  
+
       LOG.debug("Going to commit");
       dbConn.commit();
     } finally {
@@ -1202,7 +1211,7 @@ public class TxnHandler {
    * @throws TxnAbortedException
    */
   private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait)
-      throws NoSuchTxnException,  TxnAbortedException, MetaException, SQLException {
+    throws NoSuchTxnException,  TxnAbortedException, MetaException, SQLException {
     // We want to minimize the number of concurrent lock requests being issued.  If we do not we
     // get a large number of deadlocks in the database, since this method has to both clean
     // timedout locks and insert new locks.  This synchronization barrier will not eliminiate all
@@ -1227,7 +1236,7 @@ public class TxnHandler {
           LOG.debug("Going to rollback");
           dbConn.rollback();
           throw new MetaException("Transaction tables not properly " +
-              "initialized, no record found in next_lock_id");
+            "initialized, no record found in next_lock_id");
         }
         long extLockId = rs.getLong(1);
         s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
@@ -1252,8 +1261,8 @@ public class TxnHandler {
             s = "insert into TXN_COMPONENTS " +
               "(tc_txnid, tc_database, tc_table, tc_partition) " +
               "values (" + txnid + ", '" + dbName + "', " +
-                (tblName == null ? "null" : "'" + tblName + "'") + ", " +
-                (partName == null ? "null" : "'" +  partName + "'") + ")";
+              (tblName == null ? "null" : "'" + tblName + "'") + ", " +
+              (partName == null ? "null" : "'" +  partName + "'") + ")";
             LOG.debug("Going to execute update <" + s + ">");
             stmt.executeUpdate(s);
           }
@@ -1275,13 +1284,13 @@ public class TxnHandler {
           long now = getDbTime(dbConn);
           s = "insert into HIVE_LOCKS " +
             " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " +
-              "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" +
-              " values (" + extLockId + ", " +
-              + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" +
-              dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" )
-              + ", " + (partName == null ? "null" : "'" + partName + "'") +
-              ", '" + LOCK_WAITING + "', " +  "'" + lockChar + "', " + now + ", '" +
-              rqst.getUser() + "', '" + rqst.getHostname() + "')";
+            "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" +
+            " values (" + extLockId + ", " +
+            + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" +
+            dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" )
+            + ", " + (partName == null ? "null" : "'" + partName + "'") +
+            ", '" + LOCK_WAITING + "', " +  "'" + lockChar + "', " + now + ", '" +
+            rqst.getUser() + "', '" + rqst.getHostname() + "')";
           LOG.debug("Going to execute update <" + s + ">");
           stmt.executeUpdate(s);
         }
@@ -1305,7 +1314,7 @@ public class TxnHandler {
   private LockResponse checkLock(Connection dbConn,
                                  long extLockId,
                                  boolean alwaysCommit)
-      throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
+    throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
     List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);
     LockResponse response = new LockResponse();
     response.setLockid(extLockId);
@@ -1313,8 +1322,8 @@ public class TxnHandler {
     LOG.debug("Setting savepoint");
     Savepoint save = dbConn.setSavepoint();
     StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
-        "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
-        "hl_lock_type from HIVE_LOCKS where hl_db in (");
+      "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
+      "hl_lock_type from HIVE_LOCKS where hl_db in (");
 
     Set<String> strings = new HashSet<String>(locksBeingChecked.size());
     for (LockInfo info : locksBeingChecked) {
@@ -1430,7 +1439,7 @@ public class TxnHandler {
           // lock the whole database and we need to check it.  Otherwise,
           // check if they are operating on the same table, if not, move on.
           if (locks[index].table != null && locks[i].table != null
-              && !locks[index].table.equals(locks[i].table)) {
+            && !locks[index].table.equals(locks[i].table)) {
             continue;
           }
 
@@ -1438,30 +1447,30 @@ public class TxnHandler {
           // lock the whole table and we need to check it.  Otherwise,
           // check if they are operating on the same partition, if not, move on.
           if (locks[index].partition != null && locks[i].partition != null
-              && !locks[index].partition.equals(locks[i].partition)) {
+            && !locks[index].partition.equals(locks[i].partition)) {
             continue;
           }
 
           // We've found something that matches what we're trying to lock,
           // so figure out if we can lock it too.
           switch (jumpTable.get(locks[index].type).get(locks[i].type).get
-              (locks[i].state)) {
-              case ACQUIRE:
-                acquire(dbConn, stmt, extLockId, info.intLockId);
-                acquired = true;
-                break;
-              case WAIT:
-                wait(dbConn, save);
-                if (alwaysCommit) {
-                  // In the case where lockNoWait has been called we don't want to commit because
-                  // it's going to roll everything back. In every other case we want to commit here.
-                  LOG.debug("Going to commit");
-                  dbConn.commit();
-                }
-                response.setState(LockState.WAITING);
-                return response;
-              case KEEP_LOOKING:
-                continue;
+            (locks[i].state)) {
+            case ACQUIRE:
+              acquire(dbConn, stmt, extLockId, info.intLockId);
+              acquired = true;
+              break;
+            case WAIT:
+              wait(dbConn, save);
+              if (alwaysCommit) {
+                // In the case where lockNoWait has been called we don't want to commit because
+                // it's going to roll everything back. In every other case we want to commit here.
+                LOG.debug("Going to commit");
+                dbConn.commit();
+              }
+              response.setState(LockState.WAITING);
+              return response;
+            case KEEP_LOOKING:
+              continue;
           }
           if (acquired) break; // We've acquired this lock component,
           // so get out of the loop and look at the next component.
@@ -1494,18 +1503,18 @@ public class TxnHandler {
   }
 
   private void acquire(Connection dbConn, Statement stmt, long extLockId, long intLockId)
-      throws SQLException, NoSuchLockException, MetaException {
+    throws SQLException, NoSuchLockException, MetaException {
     long now = getDbTime(dbConn);
     String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " +
-        "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
-        extLockId + " and hl_lock_int_id = " + intLockId;
+      "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
+      extLockId + " and hl_lock_int_id = " + intLockId;
     LOG.debug("Going to execute update <" + s + ">");
     int rc = stmt.executeUpdate(s);
     if (rc < 1) {
       LOG.debug("Going to rollback");
       dbConn.rollback();
       throw new NoSuchLockException("No such lock: (" + extLockId + "," +
-          + intLockId + ")");
+        + intLockId + ")");
     }
     // We update the database, but we don't commit because there may be other
     // locks together with this, and we only want to acquire one if we can
@@ -1514,7 +1523,7 @@ public class TxnHandler {
 
   // Heartbeats on the lock table.  This commits, so do not enter it with any state
   private void heartbeatLock(Connection dbConn, long extLockId)
-      throws NoSuchLockException, SQLException, MetaException {
+    throws NoSuchLockException, SQLException, MetaException {
     // If the lock id is 0, then there are no locks in this heartbeat
     if (extLockId == 0) return;
     Statement stmt = null;
@@ -1523,7 +1532,7 @@ public class TxnHandler {
       long now = getDbTime(dbConn);
 
       String s = "update HIVE_LOCKS set hl_last_heartbeat = " +
-          now + " where hl_lock_ext_id = " + extLockId;
+        now + " where hl_lock_ext_id = " + extLockId;
       LOG.debug("Going to execute update <" + s + ">");
       int rc = stmt.executeUpdate(s);
       if (rc < 1) {
@@ -1540,7 +1549,7 @@ public class TxnHandler {
 
   // Heartbeats on the txn table.  This commits, so do not enter it with any state
   private void heartbeatTxn(Connection dbConn, long txnid)
-      throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException {
+    throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException {
     // If the txnid is 0, then there are no transactions in this heartbeat
     if (txnid == 0) return;
     Statement stmt = null;
@@ -1560,10 +1569,10 @@ public class TxnHandler {
         LOG.debug("Going to rollback");
         dbConn.rollback();
         throw new TxnAbortedException("Transaction " + txnid +
-            " already aborted");
+          " already aborted");
       }
       s = "update TXNS set txn_last_heartbeat = " + now +
-          " where txn_id = " + txnid;
+        " where txn_id = " + txnid;
       LOG.debug("Going to execute update <" + s + ">");
       stmt.executeUpdate(s);
       LOG.debug("Going to commit");
@@ -1575,17 +1584,17 @@ public class TxnHandler {
 
   // NEVER call this function without first calling heartbeat(long, long)
   private long getTxnIdFromLockId(Connection dbConn, long extLockId)
-      throws NoSuchLockException, MetaException, SQLException {
+    throws NoSuchLockException, MetaException, SQLException {
     Statement stmt = null;
     try {
       stmt = dbConn.createStatement();
       String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " +
-          extLockId;
+        extLockId;
       LOG.debug("Going to execute query <" + s + ">");
       ResultSet rs = stmt.executeQuery(s);
       if (!rs.next()) {
         throw new MetaException("This should never happen!  We already " +
-            "checked the lock existed but now we can't find it!");
+          "checked the lock existed but now we can't find it!");
       }
       long txnid = rs.getLong(1);
       LOG.debug("Return txnid " + (rs.wasNull() ? -1 : txnid));
@@ -1597,13 +1606,13 @@ public class TxnHandler {
 
   // NEVER call this function without first calling heartbeat(long, long)
   private List<LockInfo> getLockInfoFromLockId(Connection dbConn, long extLockId)
-      throws NoSuchLockException, MetaException, SQLException {
+    throws NoSuchLockException, MetaException, SQLException {
     Statement stmt = null;
     try {
       stmt = dbConn.createStatement();
       String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " +
-          "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " +
-          "hl_lock_ext_id = " + extLockId;
+        "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " +
+        "hl_lock_ext_id = " + extLockId;
       LOG.debug("Going to execute query <" + s + ">");
       ResultSet rs = stmt.executeQuery(s);
       boolean sawAtLeastOne = false;
@@ -1614,7 +1623,7 @@ public class TxnHandler {
       }
       if (!sawAtLeastOne) {
         throw new MetaException("This should never happen!  We already " +
-            "checked the lock existed but now we can't find it!");
+          "checked the lock existed but now we can't find it!");
       }
       return ourLockInfo;
     } finally {
@@ -1632,7 +1641,7 @@ public class TxnHandler {
       stmt = dbConn.createStatement();
       // Remove any timed out locks from the table.
       String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " +
-          (now - timeout);
+        (now - timeout);
       LOG.debug("Going to execute update <" + s + ">");
       stmt.executeUpdate(s);
       LOG.debug("Going to commit");
@@ -1652,7 +1661,7 @@ public class TxnHandler {
       stmt = dbConn.createStatement();
       // Abort any timed out locks from the table.
       String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN +
-          "' and txn_last_heartbeat <  " + (now - timeout);
+        "' and txn_last_heartbeat <  " + (now - timeout);
       LOG.debug("Going to execute query <" + s + ">");
       ResultSet rs = stmt.executeQuery(s);
       List<Long> deadTxns = new ArrayList<Long>();
@@ -1675,12 +1684,12 @@ public class TxnHandler {
     String passwd;
     try {
       passwd = ShimLoader.getHadoopShims().getPassword(conf,
-          HiveConf.ConfVars.METASTOREPWD.varname);
+        HiveConf.ConfVars.METASTOREPWD.varname);
     } catch (IOException err) {
       throw new SQLException("Error getting metastore password", err);
     }
     String connectionPooler = HiveConf.getVar(conf,
-        HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE).toLowerCase();
+      HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE).toLowerCase();
 
     if ("bonecp".equals(connectionPooler)) {
       BoneCPConfig config = new BoneCPConfig();
@@ -1696,22 +1705,22 @@ public class TxnHandler {
       // This doesn't get used, but it's still necessary, see
       // http://svn.apache.org/viewvc/commons/proper/dbcp/branches/DBCP_1_4_x_BRANCH/doc/ManualPoolingDataSourceExample.java?view=markup
       PoolableConnectionFactory poolConnFactory =
-          new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true);
+        new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true);
       connPool = new PoolingDataSource(objectPool);
     } else {
       throw new RuntimeException("Unknown JDBC connection pooling " + connectionPooler);
     }
   }
 
- private static synchronized void buildJumpTable() {
+  private static synchronized void buildJumpTable() {
     if (jumpTable != null) return;
 
     jumpTable =
-        new HashMap<LockType, Map<LockType, Map<LockState,  LockAction>>>(3);
+      new HashMap<LockType, Map<LockType, Map<LockState,  LockAction>>>(3);
 
     // SR: Lock we are trying to acquire is shared read
     Map<LockType, Map<LockState, LockAction>> m =
-        new HashMap<LockType, Map<LockState, LockAction>>(3);
+      new HashMap<LockType, Map<LockState, LockAction>>(3);
     jumpTable.put(LockType.SHARED_READ, m);
 
     // SR.SR: Lock we are examining is shared read
@@ -1743,7 +1752,7 @@ public class TxnHandler {
     // that something is blocking it that would not block a read.
     m2.put(LockState.WAITING, LockAction.KEEP_LOOKING);
 
-     // SR.E: Lock we are examining is exclusive
+    // SR.E: Lock we are examining is exclusive
     m2 = new HashMap<LockState, LockAction>(2);
     m.put(LockType.EXCLUSIVE, m2);
 
@@ -1777,7 +1786,7 @@ public class TxnHandler {
     m2.put(LockState.ACQUIRED, LockAction.WAIT);
     m2.put(LockState.WAITING, LockAction.WAIT);
 
-     // SW.E: Lock we are examining is exclusive
+    // SW.E: Lock we are examining is exclusive
     m2 = new HashMap<LockState, LockAction>(2);
     m.put(LockType.EXCLUSIVE, m2);
 
@@ -1805,7 +1814,7 @@ public class TxnHandler {
     m2.put(LockState.ACQUIRED, LockAction.WAIT);
     m2.put(LockState.WAITING, LockAction.WAIT);
 
-     // E.E: Lock we are examining is exclusive
+    // E.E: Lock we are examining is exclusive
     m2 = new HashMap<LockState, LockAction>(2);
     m.put(LockType.EXCLUSIVE, m2);
 
@@ -1813,4 +1822,20 @@ public class TxnHandler {
     m2.put(LockState.ACQUIRED, LockAction.WAIT);
     m2.put(LockState.WAITING, LockAction.WAIT);
   }
+  /**
+   * Returns true if {@code ex} should be retried
+   */
+  private static boolean isRetryable(Exception ex) {
+    if(ex instanceof SQLException) {
+      SQLException sqlException = (SQLException)ex;
+      if("08S01".equalsIgnoreCase(sqlException.getSQLState())) {
+        //in MSSQL this means Communication Link Failure
+        return true;
+      }
+    }
+    return false;
+  }
+  private static String getMessage(SQLException ex) {
+    return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")";
+  }
 }

Modified: hive/branches/branch-1.0/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-1.0/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java?rev=1654443&r1=1654442&r2=1654443&view=diff
==============================================================================
--- hive/branches/branch-1.0/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java (original)
+++ hive/branches/branch-1.0/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java Sat Jan 24 01:05:22 2015
@@ -1123,11 +1123,11 @@ public class TestTxnHandler {
                 LOG.debug("no exception, no deadlock");
               } catch (SQLException e) {
                 try {
-                  txnHandler.detectDeadlock(conn1, e, "thread t1");
+                  txnHandler.checkRetryable(conn1, e, "thread t1");
                   LOG.debug("Got an exception, but not a deadlock, SQLState is " +
                       e.getSQLState() + " class of exception is " + e.getClass().getName() +
                       " msg is <" + e.getMessage() + ">");
-                } catch (TxnHandler.DeadlockException de) {
+                } catch (TxnHandler.RetryException de) {
                   LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
                       "exception is " + e.getClass().getName() + " msg is <" + e
                       .getMessage() + ">");
@@ -1153,11 +1153,11 @@ public class TestTxnHandler {
                 LOG.debug("no exception, no deadlock");
               } catch (SQLException e) {
                 try {
-                  txnHandler.detectDeadlock(conn2, e, "thread t2");
+                  txnHandler.checkRetryable(conn2, e, "thread t2");
                   LOG.debug("Got an exception, but not a deadlock, SQLState is " +
                       e.getSQLState() + " class of exception is " + e.getClass().getName() +
                       " msg is <" + e.getMessage() + ">");
-                } catch (TxnHandler.DeadlockException de) {
+                } catch (TxnHandler.RetryException de) {
                   LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
                       "exception is " + e.getClass().getName() + " msg is <" + e
                       .getMessage() + ">");



Mime
View raw message