hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1605173 - in /hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn: CompactionTxnHandler.java TxnHandler.java
Date Tue, 24 Jun 2014 20:17:59 GMT
Author: gates
Date: Tue Jun 24 20:17:59 2014
New Revision: 1605173

URL: http://svn.apache.org/r1605173
Log:
HIVE-7225 Unclosed Statement's in TxnHandler (steve, Oh and Ted Yu via Alan Gates)

Modified:
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1605173&r1=1605172&r2=1605173&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
(original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
Tue Jun 24 20:17:59 2014
@@ -54,8 +54,9 @@ public class CompactionTxnHandler extend
   public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException
{
     Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
     Set<CompactionInfo> response = new HashSet<CompactionInfo>();
+    Statement stmt = null;
     try {
-      Statement stmt = dbConn.createStatement();
+      stmt = dbConn.createStatement();
       // Check for completed transactions
       String s = "select distinct ctc_database, ctc_table, " +
           "ctc_partition from COMPLETED_TXN_COMPONENTS";
@@ -93,6 +94,7 @@ public class CompactionTxnHandler extend
       LOG.error("Unable to connect to transaction database " + e.getMessage());
     } finally {
       closeDbConn(dbConn);
+      closeStmt(stmt);
     }
     return response;
   }
@@ -106,8 +108,9 @@ public class CompactionTxnHandler extend
   public void setRunAs(long cq_id, String user) throws MetaException {
     try {
       Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Statement stmt = null;
       try {
-       Statement stmt = dbConn.createStatement();
+       stmt = dbConn.createStatement();
        String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = "
+ cq_id;
        LOG.debug("Going to execute update <" + s + ">");
        if (stmt.executeUpdate(s) != 1) {
@@ -127,6 +130,7 @@ public class CompactionTxnHandler extend
        detectDeadlock(e, "setRunAs");
      } finally {
        closeDbConn(dbConn);
+       closeStmt(stmt);
      }
     } catch (DeadlockException e) {
       setRunAs(cq_id, user);
@@ -146,8 +150,9 @@ public class CompactionTxnHandler extend
       Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       CompactionInfo info = new CompactionInfo();
 
+      Statement stmt = null;
       try {
-        Statement stmt = dbConn.createStatement();
+        stmt = dbConn.createStatement();
         String s = "select cq_id, cq_database, cq_table, cq_partition, " +
             "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'";
         LOG.debug("Going to execute query <" + s + ">");
@@ -192,6 +197,7 @@ public class CompactionTxnHandler extend
             StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
+        closeStmt(stmt);
       }
     } catch (DeadlockException e) {
       return findNextToCompact(workerId);
@@ -208,8 +214,9 @@ public class CompactionTxnHandler extend
   public void markCompacted(CompactionInfo info) throws MetaException {
     try {
       Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Statement stmt = null;
       try {
-        Statement stmt = dbConn.createStatement();
+        stmt = dbConn.createStatement();
         String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "',
" +
             "cq_worker_id = null where cq_id = " + info.id;
         LOG.debug("Going to execute update <" + s + ">");
@@ -232,6 +239,7 @@ public class CompactionTxnHandler extend
             StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
+        closeStmt(stmt);
       }
     } catch (DeadlockException e) {
       markCompacted(info);
@@ -249,8 +257,9 @@ public class CompactionTxnHandler extend
     Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
     List<CompactionInfo> rc = new ArrayList<CompactionInfo>();
 
+    Statement stmt = null;
     try {
-      Statement stmt = dbConn.createStatement();
+      stmt = dbConn.createStatement();
       String s = "select cq_id, cq_database, cq_table, cq_partition, " +
           "cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING
+ "'";
       LOG.debug("Going to execute query <" + s + ">");
@@ -283,6 +292,7 @@ public class CompactionTxnHandler extend
           StringUtils.stringifyException(e));
     } finally {
       closeDbConn(dbConn);
+      closeStmt(stmt);
     }
   }
 
@@ -294,8 +304,9 @@ public class CompactionTxnHandler extend
   public void markCleaned(CompactionInfo info) throws MetaException {
     try {
       Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Statement stmt = null;
       try {
-        Statement stmt = dbConn.createStatement();
+        stmt = dbConn.createStatement();
         String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
         LOG.debug("Going to execute update <" + s + ">");
         if (stmt.executeUpdate(s) != 1) {
@@ -371,6 +382,7 @@ public class CompactionTxnHandler extend
             StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
+        closeStmt(stmt);
       }
     } catch (DeadlockException e) {
       markCleaned(info);
@@ -385,8 +397,9 @@ public class CompactionTxnHandler extend
   public void cleanEmptyAbortedTxns() throws MetaException {
     try {
       Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Statement stmt = null;
       try {
-        Statement stmt = dbConn.createStatement();
+        stmt = dbConn.createStatement();
         String s = "select txn_id from TXNS where " +
             "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
             "txn_state = '" + TXN_ABORTED + "'";
@@ -421,6 +434,7 @@ public class CompactionTxnHandler extend
             StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
+        closeStmt(stmt);
       }
     } catch (DeadlockException e) {
       cleanEmptyAbortedTxns();
@@ -441,8 +455,9 @@ public class CompactionTxnHandler extend
   public void revokeFromLocalWorkers(String hostname) throws MetaException {
     try {
       Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Statement stmt = null;
       try {
-        Statement stmt = dbConn.createStatement();
+        stmt = dbConn.createStatement();
         String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state
= '"
             + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id
like '"
             +  hostname + "%'";
@@ -465,6 +480,7 @@ public class CompactionTxnHandler extend
             StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
+        closeStmt(stmt);
       }
     } catch (DeadlockException e) {
       revokeFromLocalWorkers(hostname);
@@ -486,8 +502,9 @@ public class CompactionTxnHandler extend
     try {
       Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       long latestValidStart = getDbTime(dbConn) - timeout;
+      Statement stmt = null;
       try {
-        Statement stmt = dbConn.createStatement();
+        stmt = dbConn.createStatement();
         String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state
= '"
             + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start <
"
             +  latestValidStart;
@@ -510,6 +527,7 @@ public class CompactionTxnHandler extend
             StringUtils.stringifyException(e));
       } finally {
         closeDbConn(dbConn);
+        closeStmt(stmt);
       }
     } catch (DeadlockException e) {
       revokeTimedoutWorkers(timeout);

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1605173&r1=1605172&r2=1605173&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Tue
Jun 24 20:17:59 2014
@@ -120,8 +120,9 @@ public class TxnHandler {
     // database we'll look at the current transaction number first.  If it
     // subsequently shows up in the open list that's ok.
     Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+    Statement stmt = null;
     try {
-      Statement stmt = dbConn.createStatement();
+      stmt = dbConn.createStatement();
       String s = "select ntxn_next - 1 from NEXT_TXN_ID";
       LOG.debug("Going to execute query <" + s + ">");
       ResultSet rs = stmt.executeQuery(s);
@@ -157,7 +158,6 @@ public class TxnHandler {
         }
         txnInfo.add(new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4)));
       }
-      stmt.close();
       LOG.debug("Going to rollback");
       dbConn.rollback();
       return new GetOpenTxnsInfoResponse(hwm, txnInfo);
@@ -170,6 +170,7 @@ public class TxnHandler {
       throw new MetaException("Unable to select from transaction database, "
           + StringUtils.stringifyException(e));
     } finally {
+      closeStmt(stmt);
       closeDbConn(dbConn);
     }
   }
@@ -180,9 +181,10 @@ public class TxnHandler {
     // database we'll look at the current transaction number first.  If it
     // subsequently shows up in the open list that's ok.
     Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+    Statement stmt = null;
     try {
       timeOutTxns(dbConn);
-      Statement stmt = dbConn.createStatement();
+      stmt = dbConn.createStatement();
       String s = "select ntxn_next - 1 from NEXT_TXN_ID";
       LOG.debug("Going to execute query <" + s + ">");
       ResultSet rs = stmt.executeQuery(s);
@@ -203,7 +205,6 @@ public class TxnHandler {
       while (rs.next()) {
         openList.add(rs.getLong(1));
       }
-      stmt.close();
       LOG.debug("Going to rollback");
       dbConn.rollback();
       return new GetOpenTxnsResponse(hwm, openList);
@@ -216,6 +217,7 @@ public class TxnHandler {
       throw new MetaException("Unable to select from transaction database, "
           + StringUtils.stringifyException(e));
     } finally {
+      closeStmt(stmt);
       closeDbConn(dbConn);
     }
   }
@@ -235,13 +237,14 @@ public class TxnHandler {
     int numTxns = rqst.getNum_txns();
     try {
       Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Statement stmt = null;
       try {
         // Make sure the user has not requested an insane amount of txns.
         int maxTxns = HiveConf.getIntVar(conf,
             HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH);
         if (numTxns > maxTxns) numTxns = maxTxns;
 
-        Statement stmt = dbConn.createStatement();
+        stmt = dbConn.createStatement();
         String s = "select ntxn_next from NEXT_TXN_ID";
         LOG.debug("Going to execute query <" + s + ">");
         ResultSet rs = stmt.executeQuery(s);
@@ -279,6 +282,7 @@ public class TxnHandler {
         throw new MetaException("Unable to select from transaction database "
           + StringUtils.stringifyException(e));
       } finally {
+        closeStmt(stmt);
         closeDbConn(dbConn);
       }
     } catch (DeadlockException e) {
@@ -327,8 +331,9 @@ public class TxnHandler {
     long txnid = rqst.getTxnid();
     try {
       Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Statement stmt = null;
       try {
-        Statement stmt = dbConn.createStatement();
+        stmt = dbConn.createStatement();
         // Before we do the commit heartbeat the txn.  This is slightly odd in that we're
going to
         // commit it, but it does two things.  One, it makes sure the transaction is still
valid.
         // Two, it avoids the race condition where we time out between now and when we actually
@@ -369,6 +374,7 @@ public class TxnHandler {
         throw new MetaException("Unable to update transaction database "
           + StringUtils.stringifyException(e));
       } finally {
+        closeStmt(stmt);
         closeDbConn(dbConn);
       }
     } catch (DeadlockException e) {
@@ -468,6 +474,7 @@ public class TxnHandler {
       throws NoSuchLockException, TxnOpenException, MetaException {
     try {
       Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Statement stmt = null;
       try {
         // Odd as it seems, we need to heartbeat first because this touches the
         // lock table and assures that our locks our still valid.  If they are
@@ -487,7 +494,7 @@ public class TxnHandler {
           LOG.error(msg);
           throw new TxnOpenException(msg);
         }
-        Statement stmt = dbConn.createStatement();
+        stmt = dbConn.createStatement();
         String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId;
         LOG.debug("Going to execute update <" + s + ">");
         int rc = stmt.executeUpdate(s);
@@ -508,6 +515,7 @@ public class TxnHandler {
         throw new MetaException("Unable to update transaction database " +
             StringUtils.stringifyException(e));
       } finally {
+        closeStmt(stmt);
         closeDbConn(dbConn);
       }
     } catch (DeadlockException e) {
@@ -521,8 +529,9 @@ public class TxnHandler {
     Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
     ShowLocksResponse rsp = new ShowLocksResponse();
     List<ShowLocksResponseElement> elems = new ArrayList<ShowLocksResponseElement>();
+    Statement stmt = null;
     try {
-      Statement stmt = dbConn.createStatement();
+      stmt = dbConn.createStatement();
 
       String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state,
" +
           "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host from HIVE_LOCKS";
@@ -561,6 +570,7 @@ public class TxnHandler {
       throw new MetaException("Unable to select from transaction database " +
           StringUtils.stringifyException(e));
     } finally {
+      closeStmt(stmt);
       closeDbConn(dbConn);
     }
     rsp.setLocks(elems);
@@ -634,8 +644,9 @@ public class TxnHandler {
     // Put a compaction request in the queue.
     try {
       Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Statement stmt = null;
       try {
-        Statement stmt = dbConn.createStatement();
+        stmt = dbConn.createStatement();
 
         // Get the id for the next entry in the queue
         String s = "select ncq_next from NEXT_COMPACTION_QUEUE_ID";
@@ -705,6 +716,7 @@ public class TxnHandler {
         throw new MetaException("Unable to select from transaction database " +
             StringUtils.stringifyException(e));
       } finally {
+        closeStmt(stmt);
         closeDbConn(dbConn);
       }
     } catch (DeadlockException e) {
@@ -717,8 +729,9 @@ public class TxnHandler {
   public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException {
     ShowCompactResponse response = new ShowCompactResponse();
     Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+    Statement stmt = null;
     try {
-      Statement stmt = dbConn.createStatement();
+      stmt = dbConn.createStatement();
       String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id,
" +
           "cq_start, cq_run_as from COMPACTION_QUEUE";
       LOG.debug("Going to execute query <" + s + ">");
@@ -755,6 +768,7 @@ public class TxnHandler {
       throw new MetaException("Unable to select from transaction database " +
           StringUtils.stringifyException(e));
     } finally {
+      closeStmt(stmt);
       closeDbConn(dbConn);
     }
     return response;
@@ -765,8 +779,9 @@ public class TxnHandler {
    */
   int numLocksInLockTable() throws SQLException, MetaException {
     Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+    Statement stmt = null;
     try {
-      Statement stmt = dbConn.createStatement();
+      stmt = dbConn.createStatement();
       String s = "select count(*) from HIVE_LOCKS";
       LOG.debug("Going to execute query <" + s + ">");
       ResultSet rs = stmt.executeQuery(s);
@@ -777,6 +792,7 @@ public class TxnHandler {
       return rc;
     } finally {
       closeDbConn(dbConn);
+      closeStmt(stmt);
     }
   }
 
@@ -819,6 +835,18 @@ public class TxnHandler {
       LOG.warn("Failed to close db connection " + e.getMessage());
     }
   }
+  
+  /**
+   * Close statement instance.
+   * @param stmt statement instance.
+   */
+  protected void closeStmt(Statement stmt) {
+    try {
+      if (stmt != null) stmt.close();
+    } catch (SQLException e) {
+      LOG.warn("Failed to close statement " + e.getMessage());
+    }
+  }
 
   /**
    * Determine if an exception was a deadlock.  Unfortunately there is no standard way to
do
@@ -850,10 +878,10 @@ public class TxnHandler {
    * @throws org.apache.hadoop.hive.metastore.api.MetaException if the time cannot be determined
    */
   protected long getDbTime(Connection conn) throws MetaException {
+    Statement stmt = null;
     try {
-      Statement stmt = conn.createStatement();
+      stmt = conn.createStatement();
       String s;
-      ResultSet rs;
       DatabaseProduct prod = determineDatabaseProduct(conn);
       switch (prod) {
         case DERBY:
@@ -876,13 +904,15 @@ public class TxnHandler {
           throw new MetaException(msg);
       }
       LOG.debug("Going to execute query <" + s + ">");
-      rs = stmt.executeQuery(s);
+      ResultSet rs = stmt.executeQuery(s);
       if (!rs.next()) throw new MetaException("No results from date query");
       return rs.getTimestamp(1).getTime();
     } catch (SQLException e) {
       String msg = "Unable to determine current time: " + e.getMessage();
       LOG.error(msg);
       throw new MetaException(msg);
+    } finally {
+      closeStmt(stmt);
     }
   }
 
@@ -1042,33 +1072,39 @@ public class TxnHandler {
    * @throws SQLException
    */
   private int abortTxns(Connection dbConn, List<Long> txnids) throws SQLException {
-    Statement stmt = dbConn.createStatement();
-
-    // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS
-    StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in (");
-    boolean first = true;
-    for (Long id : txnids) {
-      if (first) first = false;
-      else buf.append(',');
-      buf.append(id);
-    }
-    buf.append(')');
-    LOG.debug("Going to execute update <" + buf.toString() + ">");
-    stmt.executeUpdate(buf.toString());
-
-    buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id
in (");
-    first = true;
-    for (Long id : txnids) {
-      if (first) first = false;
-      else buf.append(',');
-      buf.append(id);
+    Statement stmt = null;
+    int updateCnt = 0;
+    try {
+      stmt = dbConn.createStatement();
+  
+      // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS
+      StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in (");
+      boolean first = true;
+      for (Long id : txnids) {
+        if (first) first = false;
+        else buf.append(',');
+        buf.append(id);
+      }
+      buf.append(')');
+      LOG.debug("Going to execute update <" + buf.toString() + ">");
+      stmt.executeUpdate(buf.toString());
+  
+      buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id
in (");
+      first = true;
+      for (Long id : txnids) {
+        if (first) first = false;
+        else buf.append(',');
+        buf.append(id);
+      }
+      buf.append(')');
+      LOG.debug("Going to execute update <" + buf.toString() + ">");
+      updateCnt = stmt.executeUpdate(buf.toString());
+  
+      LOG.debug("Going to commit");
+      dbConn.commit();
+    } finally {
+      closeStmt(stmt);
     }
-    buf.append(')');
-    LOG.debug("Going to execute update <" + buf.toString() + ">");
-    int updateCnt = stmt.executeUpdate(buf.toString());
-
-    LOG.debug("Going to commit");
-    dbConn.commit();
     return updateCnt;
   }
 
@@ -1102,9 +1138,9 @@ public class TxnHandler {
     synchronized (lockLock) {
       // Clean up timed out locks before we attempt to acquire any.
       timeOutLocks(dbConn);
-
+      Statement stmt = null;
       try {
-        Statement stmt = dbConn.createStatement();
+        stmt = dbConn.createStatement();
 
         // Get the next lock id.
         String s = "select nl_next from NEXT_LOCK_ID";
@@ -1183,6 +1219,8 @@ public class TxnHandler {
       } catch (NoSuchLockException e) {
         // This should never happen, as we just added the lock id
         throw new MetaException("Couldn't find a lock we just created!");
+      } finally {
+        closeStmt(stmt);
       }
     }
   }
@@ -1197,7 +1235,6 @@ public class TxnHandler {
 
     LOG.debug("Setting savepoint");
     Savepoint save = dbConn.setSavepoint();
-    Statement stmt = dbConn.createStatement();
     StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
         "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
         "hl_lock_type from HIVE_LOCKS where hl_db in (");
@@ -1267,98 +1304,104 @@ public class TxnHandler {
     }
 
     LOG.debug("Going to execute query <" + query.toString() + ">");
-    ResultSet rs = stmt.executeQuery(query.toString());
-    SortedSet lockSet = new TreeSet(new LockInfoComparator());
-    while (rs.next()) {
-      lockSet.add(new LockInfo(rs));
-    }
-    // Turn the tree set into an array so we can move back and forth easily
-    // in it.
-    LockInfo[] locks = (LockInfo[])lockSet.toArray(new LockInfo[1]);
-
-    for (LockInfo info : locksBeingChecked) {
-      // Find the lock record we're checking
-      int index = -1;
-      for (int i = 0; i < locks.length; i++) {
-        if (locks[i].equals(info)) {
-          index = i;
-          break;
-        }
+    Statement stmt = null;
+    try {
+      stmt = dbConn.createStatement();
+      ResultSet rs = stmt.executeQuery(query.toString());
+      SortedSet lockSet = new TreeSet(new LockInfoComparator());
+      while (rs.next()) {
+        lockSet.add(new LockInfo(rs));
       }
+      // Turn the tree set into an array so we can move back and forth easily
+      // in it.
+      LockInfo[] locks = (LockInfo[])lockSet.toArray(new LockInfo[1]);
 
-      // If we didn't find the lock, then it must not be in the table
-      if (index == -1) {
-        LOG.debug("Going to rollback");
-        dbConn.rollback();
-        throw new MetaException("How did we get here, we heartbeated our lock before we started!");
-      }
+      for (LockInfo info : locksBeingChecked) {
+        // Find the lock record we're checking
+        int index = -1;
+        for (int i = 0; i < locks.length; i++) {
+          if (locks[i].equals(info)) {
+            index = i;
+            break;
+          }
+        }
 
+        // If we didn't find the lock, then it must not be in the table
+        if (index == -1) {
+          LOG.debug("Going to rollback");
+          dbConn.rollback();
+          throw new MetaException("How did we get here, we heartbeated our lock before we
started!");
+        }
 
-      // If we've found it and it's already been marked acquired,
-      // then just look at the other locks.
-      if (locks[index].state == LockState.ACQUIRED) {
-        continue;
-      }
 
-      // Look at everything in front of this lock to see if it should block
-      // it or not.
-      boolean acquired = false;
-      for (int i = index - 1; i >= 0; i--) {
-        // Check if we're operating on the same database, if not, move on
-        if (!locks[index].db.equals(locks[i].db)) {
+        // If we've found it and it's already been marked acquired,
+        // then just look at the other locks.
+        if (locks[index].state == LockState.ACQUIRED) {
           continue;
         }
 
-        // If table is null on either of these, then they are claiming to
-        // lock the whole database and we need to check it.  Otherwise,
-        // check if they are operating on the same table, if not, move on.
-        if (locks[index].table != null && locks[i].table != null
-            && !locks[index].table.equals(locks[i].table)) {
-          continue;
-        }
+        // Look at everything in front of this lock to see if it should block
+        // it or not.
+        boolean acquired = false;
+        for (int i = index - 1; i >= 0; i--) {
+          // Check if we're operating on the same database, if not, move on
+          if (!locks[index].db.equals(locks[i].db)) {
+            continue;
+          }
 
-        // If partition is null on either of these, then they are claiming to
-        // lock the whole table and we need to check it.  Otherwise,
-        // check if they are operating on the same partition, if not, move on.
-        if (locks[index].partition != null && locks[i].partition != null
-            && !locks[index].partition.equals(locks[i].partition)) {
-          continue;
-        }
+          // If table is null on either of these, then they are claiming to
+          // lock the whole database and we need to check it.  Otherwise,
+          // check if they are operating on the same table, if not, move on.
+          if (locks[index].table != null && locks[i].table != null
+              && !locks[index].table.equals(locks[i].table)) {
+            continue;
+          }
 
-        // We've found something that matches what we're trying to lock,
-        // so figure out if we can lock it too.
-        switch (jumpTable.get(locks[index].type).get(locks[i].type).get
-            (locks[i].state)) {
-          case ACQUIRE:
-            acquire(dbConn, stmt, extLockId, info.intLockId);
-            acquired = true;
-            break;
-          case WAIT:
-            wait(dbConn, save);
-            if (alwaysCommit) {
-              // In the case where lockNoWait has been called we don't want to commit because
-              // it's going to roll everything back.  In every other case we want to commit
here.
-              LOG.debug("Going to commit");
-              dbConn.commit();
-            }
-            response.setState(LockState.WAITING);
-            return response;
-          case KEEP_LOOKING:
+          // If partition is null on either of these, then they are claiming to
+          // lock the whole table and we need to check it.  Otherwise,
+          // check if they are operating on the same partition, if not, move on.
+          if (locks[index].partition != null && locks[i].partition != null
+              && !locks[index].partition.equals(locks[i].partition)) {
             continue;
+          }
+
+          // We've found something that matches what we're trying to lock,
+          // so figure out if we can lock it too.
+          switch (jumpTable.get(locks[index].type).get(locks[i].type).get
+              (locks[i].state)) {
+              case ACQUIRE:
+                acquire(dbConn, stmt, extLockId, info.intLockId);
+                acquired = true;
+                break;
+              case WAIT:
+                wait(dbConn, save);
+                if (alwaysCommit) {
+                  // In the case where lockNoWait has been called we don't want to commit
because
+                  // it's going to roll everything back. In every other case we want to commit
here.
+                  LOG.debug("Going to commit");
+                  dbConn.commit();
+                }
+                response.setState(LockState.WAITING);
+                return response;
+              case KEEP_LOOKING:
+                continue;
+          }
+          if (acquired) break; // We've acquired this lock component,
+          // so get out of the loop and look at the next component.
         }
-        if (acquired) break; // We've acquired this lock component,
-        // so get out of the loop and look at the next component.
+
+        // If we've arrived here and we have not already acquired, it means there's nothing
in the
+        // way of the lock, so acquire the lock.
+        if (!acquired) acquire(dbConn, stmt, extLockId, info.intLockId);
       }
 
-      // If we've arrived here and we have not already acquired, it means there's nothing
in the
-      // way of the lock, so acquire the lock.
-      if (!acquired) acquire(dbConn, stmt, extLockId, info.intLockId);
+      // We acquired all of the locks, so commit and return acquired.
+      LOG.debug("Going to commit");
+      dbConn.commit();
+      response.setState(LockState.ACQUIRED);
+    } finally {
+      closeStmt(stmt);
     }
-
-    // We acquired all of the locks, so commit and return acquired.
-    LOG.debug("Going to commit");
-    dbConn.commit();
-    response.setState(LockState.ACQUIRED);
     return response;
   }
 
@@ -1397,20 +1440,25 @@ public class TxnHandler {
       throws NoSuchLockException, SQLException, MetaException {
     // If the lock id is 0, then there are no locks in this heartbeat
     if (extLockId == 0) return;
-    Statement stmt = dbConn.createStatement();
-    long now = getDbTime(dbConn);
+    Statement stmt = null;
+    try {
+      stmt = dbConn.createStatement();
+      long now = getDbTime(dbConn);
 
-    String s = "update HIVE_LOCKS set hl_last_heartbeat = " +
-        now + " where hl_lock_ext_id = " + extLockId;
-    LOG.debug("Going to execute update <" + s + ">");
-    int rc = stmt.executeUpdate(s);
-    if (rc < 1) {
-      LOG.debug("Going to rollback");
-      dbConn.rollback();
-      throw new NoSuchLockException("No such lock: " + extLockId);
+      String s = "update HIVE_LOCKS set hl_last_heartbeat = " +
+          now + " where hl_lock_ext_id = " + extLockId;
+      LOG.debug("Going to execute update <" + s + ">");
+      int rc = stmt.executeUpdate(s);
+      if (rc < 1) {
+        LOG.debug("Going to rollback");
+        dbConn.rollback();
+        throw new NoSuchLockException("No such lock: " + extLockId);
+      }
+      LOG.debug("Going to commit");
+      dbConn.commit();
+    } finally {
+      closeStmt(stmt);
     }
-    LOG.debug("Going to commit");
-    dbConn.commit();
   }
 
   // Heartbeats on the txn table.  This commits, so do not enter it with any state
@@ -1418,68 +1466,83 @@ public class TxnHandler {
       throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException {
     // If the txnid is 0, then there are no transactions in this heartbeat
     if (txnid == 0) return;
-    Statement stmt = dbConn.createStatement();
-    long now = getDbTime(dbConn);
-    // We need to check whether this transaction is valid and open
-    String s = "select txn_state from TXNS where txn_id = " + txnid;
-    LOG.debug("Going to execute query <" + s + ">");
-    ResultSet rs = stmt.executeQuery(s);
-    if (!rs.next()) {
-      LOG.debug("Going to rollback");
-      dbConn.rollback();
-      throw new NoSuchTxnException("No such transaction: " + txnid);
-    }
-    if (rs.getString(1).charAt(0) == TXN_ABORTED) {
-      LOG.debug("Going to rollback");
-      dbConn.rollback();
-      throw new TxnAbortedException("Transaction " + txnid +
-          " already aborted");
+    Statement stmt = null;
+    try {
+      stmt = dbConn.createStatement();
+      long now = getDbTime(dbConn);
+      // We need to check whether this transaction is valid and open
+      String s = "select txn_state from TXNS where txn_id = " + txnid;
+      LOG.debug("Going to execute query <" + s + ">");
+      ResultSet rs = stmt.executeQuery(s);
+      if (!rs.next()) {
+        LOG.debug("Going to rollback");
+        dbConn.rollback();
+        throw new NoSuchTxnException("No such transaction: " + txnid);
+      }
+      if (rs.getString(1).charAt(0) == TXN_ABORTED) {
+        LOG.debug("Going to rollback");
+        dbConn.rollback();
+        throw new TxnAbortedException("Transaction " + txnid +
+            " already aborted");
+      }
+      s = "update TXNS set txn_last_heartbeat = " + now +
+          " where txn_id = " + txnid;
+      LOG.debug("Going to execute update <" + s + ">");
+      stmt.executeUpdate(s);
+      LOG.debug("Going to commit");
+      dbConn.commit();
+    } finally {
+      closeStmt(stmt);
     }
-    s = "update TXNS set txn_last_heartbeat = " + now +
-        " where txn_id = " + txnid;
-    LOG.debug("Going to execute update <" + s + ">");
-    stmt.executeUpdate(s);
-    LOG.debug("Going to commit");
-    dbConn.commit();
   }
 
   // NEVER call this function without first calling heartbeat(long, long)
   private long getTxnIdFromLockId(Connection dbConn, long extLockId)
       throws NoSuchLockException, MetaException, SQLException {
-    Statement stmt = dbConn.createStatement();
-    String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " +
-        extLockId;
-    LOG.debug("Going to execute query <" + s + ">");
-    ResultSet rs = stmt.executeQuery(s);
-    if (!rs.next()) {
-      throw new MetaException("This should never happen!  We already " +
-          "checked the lock existed but now we can't find it!");
-    }
-    long txnid = rs.getLong(1);
-    LOG.debug("Return txnid " + (rs.wasNull() ? -1 : txnid));
-    return (rs.wasNull() ? -1 : txnid);
+    Statement stmt = null;
+    try {
+      stmt = dbConn.createStatement();
+      String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " +
+          extLockId;
+      LOG.debug("Going to execute query <" + s + ">");
+      ResultSet rs = stmt.executeQuery(s);
+      if (!rs.next()) {
+        throw new MetaException("This should never happen!  We already " +
+            "checked the lock existed but now we can't find it!");
+      }
+      long txnid = rs.getLong(1);
+      LOG.debug("Return txnid " + (rs.wasNull() ? -1 : txnid));
+      return (rs.wasNull() ? -1 : txnid);
+    } finally {
+      closeStmt(stmt);
+    }
   }
 
   // NEVER call this function without first calling heartbeat(long, long)
   private List<LockInfo> getLockInfoFromLockId(Connection dbConn, long extLockId)
       throws NoSuchLockException, MetaException, SQLException {
-    Statement stmt = dbConn.createStatement();
-    String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " +
-        "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " +
-        "hl_lock_ext_id = " + extLockId;
-    LOG.debug("Going to execute query <" + s + ">");
-    ResultSet rs = stmt.executeQuery(s);
-    boolean sawAtLeastOne = false;
-    List<LockInfo> ourLockInfo = new ArrayList<LockInfo>();
-    while (rs.next()) {
-      ourLockInfo.add(new LockInfo(rs));
-      sawAtLeastOne = true;
-    }
-    if (!sawAtLeastOne) {
-      throw new MetaException("This should never happen!  We already " +
-          "checked the lock existed but now we can't find it!");
+    Statement stmt = null;
+    try {
+      stmt = dbConn.createStatement();
+      String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " +
+          "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " +
+          "hl_lock_ext_id = " + extLockId;
+      LOG.debug("Going to execute query <" + s + ">");
+      ResultSet rs = stmt.executeQuery(s);
+      boolean sawAtLeastOne = false;
+      List<LockInfo> ourLockInfo = new ArrayList<LockInfo>();
+      while (rs.next()) {
+        ourLockInfo.add(new LockInfo(rs));
+        sawAtLeastOne = true;
+      }
+      if (!sawAtLeastOne) {
+        throw new MetaException("This should never happen!  We already " +
+            "checked the lock existed but now we can't find it!");
+      }
+      return ourLockInfo;
+    } finally {
+      closeStmt(stmt);
     }
-    return ourLockInfo;
   }
 
   // Clean time out locks from the database.  This does a commit,
@@ -1487,14 +1550,19 @@ public class TxnHandler {
   // open transactions.
   private void timeOutLocks(Connection dbConn) throws SQLException, MetaException {
     long now = getDbTime(dbConn);
-    Statement stmt = dbConn.createStatement();
-    // Remove any timed out locks from the table.
-    String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " +
-        (now - timeout);
-    LOG.debug("Going to execute update <" + s + ">");
-    stmt.executeUpdate(s);
-    LOG.debug("Going to commit");
-    dbConn.commit();
+    Statement stmt = null;
+    try {
+      stmt = dbConn.createStatement();
+      // Remove any timed out locks from the table.
+      String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " +
+          (now - timeout);
+      LOG.debug("Going to execute update <" + s + ">");
+      stmt.executeUpdate(s);
+      LOG.debug("Going to commit");
+      dbConn.commit();
+    } finally {
+      closeStmt(stmt);
+    }
   }
 
   // Abort timed out transactions.  This calls abortTxn(), which does a commit,
@@ -1502,19 +1570,24 @@ public class TxnHandler {
   // open transactions on the underlying database.
   private void timeOutTxns(Connection dbConn) throws SQLException, MetaException {
     long now = getDbTime(dbConn);
-    Statement stmt = dbConn.createStatement();
-    // Abort any timed out locks from the table.
-    String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN +
-        "' and txn_last_heartbeat <  " + (now - timeout);
-    LOG.debug("Going to execute query <" + s + ">");
-    ResultSet rs = stmt.executeQuery(s);
-    List<Long> deadTxns = new ArrayList<Long>();
-    // Limit the number of timed out transactions we do in one pass to keep from generating
a
-    // huge delete statement
-    for (int i = 0; i < 20 && rs.next(); i++) deadTxns.add(rs.getLong(1));
-    // We don't care whether all of the transactions get deleted or not,
-    // if some didn't it most likely means someone else deleted them in the interum
-    if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns);
+    Statement stmt = null;
+    try {
+      stmt = dbConn.createStatement();
+      // Abort any timed out locks from the table.
+      String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN +
+          "' and txn_last_heartbeat <  " + (now - timeout);
+      LOG.debug("Going to execute query <" + s + ">");
+      ResultSet rs = stmt.executeQuery(s);
+      List<Long> deadTxns = new ArrayList<Long>();
+      // Limit the number of timed out transactions we do in one pass to keep from generating
a
+      // huge delete statement
+      for (int i = 0; i < 20 && rs.next(); i++) deadTxns.add(rs.getLong(1));
+      // We don't care whether all of the transactions get deleted or not,
+      // if some didn't it most likely means someone else deleted them in the interum
+      if (deadTxns.size() > 0) abortTxns(dbConn, deadTxns);
+    } finally {
+      closeStmt(stmt);
+    }
   }
 
   private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException
{



Mime
View raw message