hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ekoif...@apache.org
Subject hive git commit: HIVE-12620 Misc improvement to Acid module(Eugene Koifman, reviewed by Wei Zheng, Jason Dere)
Date Mon, 14 Dec 2015 20:26:47 GMT
Repository: hive
Updated Branches:
  refs/heads/branch-1 063a82b7d -> f14b3c6d6


HIVE-12620 Misc improvement to Acid module(Eugene Koifman, reviewed by Wei Zheng, Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f14b3c6d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f14b3c6d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f14b3c6d

Branch: refs/heads/branch-1
Commit: f14b3c6d66db43cf7774b86f46a941f4e0b7b980
Parents: 063a82b
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Mon Dec 14 12:26:31 2015 -0800
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Mon Dec 14 12:26:31 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 60 ++++++++++++++++----
 .../hadoop/hive/ql/lockmgr/DbLockManager.java   | 47 ++++++++++++++-
 .../hive/ql/lockmgr/TestDbTxnManager.java       | 43 +++++++++++---
 3 files changed, 128 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f14b3c6d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index de9523b..a8bb3b8 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -529,8 +529,7 @@ public class TxnHandler {
         else {
           heartbeatLock(dbConn, extLockId);
         }
-        closeDbConn(dbConn);
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
         return checkLock(dbConn, extLockId);
       } catch (SQLException e) {
         LOG.debug("Going to rollback");
@@ -1099,6 +1098,10 @@ public class TxnHandler {
           LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: "
+ getMessage(e));
         }
       }
+      else {
+        //make sure we know we saw an error that we don't recognize
+        LOG.info("Non-retryable error: " + getMessage(e));
+      }
     }
     finally {
       /*if this method ends with anything except a retry signal, the caller should fail the
operation
@@ -1577,7 +1580,7 @@ public class TxnHandler {
         return checkLock(dbConn, extLockId);
       } catch (NoSuchLockException e) {
         // This should never happen, as we just added the lock id
-        throw new MetaException("Couldn't find a lock we just created!");
+        throw new MetaException("Couldn't find a lock we just created! " + e.getMessage());
       } finally {
         close(rs);
         closeStmt(stmt);
@@ -1706,7 +1709,7 @@ public class TxnHandler {
         if (index == -1) {
           LOG.debug("Going to rollback");
           dbConn.rollback();
-          throw new MetaException("How did we get here, we heartbeated our lock before we
started!");
+          throw new MetaException("How did we get here, we heartbeated our lock before we
started! ( " + info + ")");
         }
 
 
@@ -1972,17 +1975,50 @@ public class TxnHandler {
   // open transactions.
   private void timeOutLocks(Connection dbConn, long now) {
     Statement stmt = null;
+    ResultSet rs = null;
     try {
       stmt = dbConn.createStatement();
-      // Remove any timed out locks from the table.
-      String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " +
-        (now - timeout) + " and hl_txnid = 0";//when txnid is > 0, the lock is
+      long maxHeartbeatTime = now - timeout;
+      //doing a SELECT first is less efficient but makes it easier to debug things
+      String s = "select distinct hl_lock_ext_id from HIVE_LOCKS where hl_last_heartbeat
< " +
+        maxHeartbeatTime + " and hl_txnid = 0";//when txnid is <> 0, the lock is
       //associated with a txn and is handled by performTimeOuts()
       //want to avoid expiring locks for a txn w/o expiring the txn itself
-      LOG.debug("Going to execute update <" + s + ">");
-      int deletedLocks = stmt.executeUpdate(s);
+      List<Long> extLockIDs = new ArrayList<>();
+      rs = stmt.executeQuery(s);
+      while(rs.next()) {
+        extLockIDs.add(rs.getLong(1));
+      }
+      rs.close();
+      dbConn.commit();
+      if(extLockIDs.size() <= 0) {
+        return;
+      }
+      int deletedLocks = 0;
+      //include same hl_last_heartbeat condition in case someone heartbeated since the select
+      s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + maxHeartbeatTime + " and
hl_txnid = 0" +
+        " and hl_lock_ext_id IN (";
+      int numWholeBatches = extLockIDs.size() / TIMED_OUT_TXN_ABORT_BATCH_SIZE;
+      for(int i = 0; i < numWholeBatches; i++) {
+        StringBuilder sb = new StringBuilder(s);
+        for(int j = i * TIMED_OUT_TXN_ABORT_BATCH_SIZE; j < (i + 1) * TIMED_OUT_TXN_ABORT_BATCH_SIZE;
j++) {
+          sb.append(extLockIDs.get(j)).append(",");
+        }
+        sb.setCharAt(sb.length() - 1, ')');
+        LOG.debug("Removing expired locks via: " + sb.toString());
+        deletedLocks += stmt.executeUpdate(sb.toString());
+        dbConn.commit();
+      }
+      StringBuilder sb = new StringBuilder(s);
+      for(int i = numWholeBatches * TIMED_OUT_TXN_ABORT_BATCH_SIZE; i < extLockIDs.size();
i++) {
+        sb.append(extLockIDs.get(i)).append(",");
+      }
+      sb.setCharAt(sb.length() - 1, ')');
+      LOG.debug("Removing expired locks via: " + sb.toString());
+      deletedLocks += stmt.executeUpdate(sb.toString());
       if(deletedLocks > 0) {
-        LOG.info("Deleted " + deletedLocks + " locks from HIVE_LOCKS due to timeout");
+        LOG.info("Deleted " + deletedLocks + " ext locks from HIVE_LOCKS due to timeout (vs.
" +
+          extLockIDs.size() + " found. List: " + extLockIDs + ") maxHeartbeatTime=" + maxHeartbeatTime);
       }
       LOG.debug("Going to commit");
       dbConn.commit();
@@ -1993,6 +2029,7 @@ public class TxnHandler {
     catch(Exception ex) {
       LOG.error("Failed to purge timedout locks due to: " + ex.getMessage(), ex);
     } finally {
+      close(rs);
       closeStmt(stmt);
     }
   }
@@ -2265,7 +2302,8 @@ public class TxnHandler {
         //in MSSQL this means Communication Link Failure
         return true;
       }
-      if("ORA-08176".equalsIgnoreCase(sqlException.getSQLState())) {
+      if("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) ||
+        sqlException.getMessage().contains("consistent read failure; rollback data not available"))
{
         return true;
       }
       //see also https://issues.apache.org/jira/browse/HIVE-9938

http://git-wip-us.apache.org/repos/asf/hive/blob/f14b3c6d/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index 422f1d2..6f7f961 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -48,6 +48,7 @@ public class DbLockManager implements HiveLockManager{
   static final private Log LOG = LogFactory.getLog(CLASS_NAME);
 
   private long MAX_SLEEP;
+  //longer term we should always have a txn id and then we won't need to track locks here
at all
   private Set<DbHiveLock> locks;
   private IMetaStoreClient client;
   private long nextSleep = 50;
@@ -111,7 +112,27 @@ public class DbLockManager implements HiveLockManager{
 
       }
       long retryDuration = System.currentTimeMillis() - startRetry;
-      DbHiveLock hl = new DbHiveLock(res.getLockid());
+      DbHiveLock hl = new DbHiveLock(res.getLockid(), queryId, lock.getTxnid());
+      if(locks.size() > 0) {
+        boolean logMsg = false;
+        for(DbHiveLock l : locks) {
+          if(l.txnId != hl.txnId) {
+            //locks from different transactions detected (or from transaction and read-only
query in autocommit)
+            logMsg = true;
+            break;
+          }
+          else if(l.txnId == 0) {
+            if(!l.queryId.equals(hl.queryId)) {
+              //here means no open transaction, but different queries
+              logMsg = true;
+              break;
+            }
+          }
+        }
+        if(logMsg) {
+          LOG.warn("adding new DbHiveLock(" + hl + ") while we are already tracking locks:
" + locks);
+        }
+      }
       locks.add(hl);
       if (res.getState() != LockState.ACQUIRED) {
         if(res.getState() == LockState.WAITING) {
@@ -177,12 +198,17 @@ public class DbLockManager implements HiveLockManager{
   @Override
   public void unlock(HiveLock hiveLock) throws LockException {
     long lockId = ((DbHiveLock)hiveLock).lockId;
+    boolean removed = false;
     try {
       LOG.debug("Unlocking " + hiveLock);
       client.unlock(lockId);
-      boolean removed = locks.remove(hiveLock);
+      //important to remove after unlock() in case it fails
+      removed = locks.remove(hiveLock);
       LOG.debug("Removed a lock " + removed);
     } catch (NoSuchLockException e) {
+      //if metastore has no record of this lock, it most likely timed out; either way
+      //there is no point tracking it here any longer
+      removed = locks.remove(hiveLock);
       LOG.error("Metastore could find no record of lock " + JavaUtils.lockIdToString(lockId));
       throw new LockException(e, ErrorMsg.LOCK_NO_SUCH_LOCK, JavaUtils.lockIdToString(lockId));
     } catch (TxnOpenException e) {
@@ -192,10 +218,16 @@ public class DbLockManager implements HiveLockManager{
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
           e);
     }
+    finally {
+      if(removed) {
+        LOG.debug("Removed a lock " + hiveLock);
+      }
+    }
   }
 
   @Override
   public void releaseLocks(List<HiveLock> hiveLocks) {
+    LOG.info("releaseLocks: " + hiveLocks);
     for (HiveLock lock : hiveLocks) {
       try {
         unlock(lock);
@@ -203,6 +235,8 @@ public class DbLockManager implements HiveLockManager{
         // Not sure why this method doesn't throw any exceptions,
         // but since the interface doesn't allow it we'll just swallow them and
         // move on.
+        //This OK-ish since releaseLocks() is only called for RO/AC queries; it
+        //would be really bad to eat exceptions here for write operations
       }
     }
   }
@@ -249,10 +283,17 @@ public class DbLockManager implements HiveLockManager{
   static class DbHiveLock extends HiveLock {
 
     long lockId;
+    String queryId;
+    long txnId;
 
     DbHiveLock(long id) {
       lockId = id;
     }
+    DbHiveLock(long id, String queryId, long txnId) {
+      lockId = id;
+      this.queryId = queryId;
+      this.txnId = txnId;
+    }
 
     @Override
     public HiveLockObject getHiveLockObject() {
@@ -279,7 +320,7 @@ public class DbLockManager implements HiveLockManager{
     }
     @Override
     public String toString() {
-      return JavaUtils.lockIdToString(lockId);
+      return JavaUtils.lockIdToString(lockId) + " queryId=" + queryId + " " + JavaUtils.txnIdToString(txnId);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f14b3c6d/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 6badf23..354ec57 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
@@ -89,7 +90,7 @@ public class TestDbTxnManager {
     List<HiveLock> locks = ctx.getHiveLocks();
     Assert.assertEquals(1, locks.size());
     Assert.assertEquals(1,
-        TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId));
+      TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId));
     txnMgr.getLockManager().unlock(locks.get(0));
     locks = txnMgr.getLockManager().getLocks(false, false);
     Assert.assertEquals(0, locks.size());
@@ -156,7 +157,7 @@ public class TestDbTxnManager {
     List<HiveLock> locks = ctx.getHiveLocks();
     Assert.assertEquals(1, locks.size());
     Assert.assertEquals(1,
-        TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId));
+      TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId));
     txnMgr.commitTxn();
     locks = txnMgr.getLockManager().getLocks(false, false);
     Assert.assertEquals(0, locks.size());
@@ -201,9 +202,8 @@ public class TestDbTxnManager {
   }
   @Test
   public void testExceptions() throws Exception {
-    WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
+    addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
     QueryPlan qp = new MockQueryPlan(this);
-    txnMgr.acquireLocks(qp, ctx, "PeterI");
     txnMgr.openTxn("NicholasII");
     runReaper();
     LockException exception = null;
@@ -243,6 +243,32 @@ public class TestDbTxnManager {
   }
 
   @Test
+  public void testLockTimeout() throws Exception {
+    addPartitionInput(newTable(true));
+    QueryPlan qp = new MockQueryPlan(this);
+    //make sure it works with nothing to expire
+    expireLocks(txnMgr, 0);
+    //create a few read locks, all on the same resource
+    for(int i = 0; i < 5; i++) {
+      txnMgr.acquireLocks(qp, ctx, "PeterI" + i);
+    }
+    expireLocks(txnMgr, 5);
+    //create a lot of locks
+    for(int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
+      txnMgr.acquireLocks(qp, ctx, "PeterI" + i);
+    }
+    expireLocks(txnMgr, TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
+  }
+  private void expireLocks(HiveTxnManager txnMgr, int numLocksBefore) throws Exception {
+    DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager();
+    ShowLocksResponse resp = lockManager.getLocks();
+    Assert.assertEquals("Wrong number of locks before expire", numLocksBefore, resp.getLocks().size());
+    runReaper();
+    resp = lockManager.getLocks();
+    Assert.assertEquals("Expected all locks to expire", 0, resp.getLocks().size());
+  }
+
+  @Test
   public void testReadWrite() throws Exception {
     Table t = newTable(true);
     addPartitionInput(t);
@@ -362,6 +388,7 @@ public class TestDbTxnManager {
   public void setUp() throws Exception {
     TxnDbUtil.prepDb();
     txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    txnMgr.getLockManager();//init lock manager
     Assert.assertTrue(txnMgr instanceof DbTxnManager);
     nextInput = 1;
     nextOutput = 1;
@@ -380,15 +407,15 @@ public class TestDbTxnManager {
   }
 
   private static class MockQueryPlan extends QueryPlan {
-    private HashSet<ReadEntity> inputs;
-    private HashSet<WriteEntity> outputs;
+    private final HashSet<ReadEntity> inputs = new HashSet<>();
+    private final HashSet<WriteEntity> outputs = new HashSet<>();
     private final String queryId;
 
     MockQueryPlan(TestDbTxnManager test) {
       HashSet<ReadEntity> r = test.readEntities;
       HashSet<WriteEntity> w = test.writeEntities;
-      inputs = (r == null) ? new HashSet<ReadEntity>() : r;
-      outputs = (w == null) ? new HashSet<WriteEntity>() : w;
+      inputs.addAll(test.readEntities);
+      outputs.addAll(test.writeEntities);
       queryId = makeQueryId();
     }
 


Mime
View raw message