hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ekoif...@apache.org
Subject [1/3] hive git commit: HIVE-11914 When transactions gets a heartbeat, it doesn't update the lock heartbeat. (Eugene Koifman, reviewed by Alan Gates)
Date Sat, 10 Oct 2015 18:31:28 GMT
Repository: hive
Updated Branches:
  refs/heads/branch-1 485b8adec -> 44bfbd81d


HIVE-11914 When transactions gets a heartbeat, it doesn't update the lock heartbeat. (Eugene
Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7bf681de
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7bf681de
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7bf681de

Branch: refs/heads/branch-1
Commit: 7bf681de1ec6a8415e524cbefea0d90de1f96672
Parents: 485b8ad
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Sat Oct 10 11:05:47 2015 -0700
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Sat Oct 10 11:05:47 2015 -0700

----------------------------------------------------------------------
 .../hive/hcatalog/streaming/TestStreaming.java  | 27 ++++++++++++++++
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  4 +++
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    | 34 +++++++++++++-------
 .../hive/ql/txn/compactor/CompactorMR.java      |  8 ++---
 .../hive/ql/lockmgr/TestDbTxnManager.java       |  2 +-
 5 files changed, 59 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7bf681de/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 2f6baec..340ab6c 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
@@ -553,6 +555,31 @@ public class TestStreaming {
     txnBatch.close();
     connection.close();
   }
+
+  @Test
+  public void testHearbeat() throws Exception {
+    HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
+    DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt);
+    StreamingConnection connection = endPt.newConnection(false, null);
+
+    TransactionBatch txnBatch =  connection.fetchTransactionBatch(5, writer);
+    txnBatch.beginNextTransaction();
+    //todo: this should ideally check Transaction heartbeat as well, but heartbeat
+    //timestamp is not reported yet
+    //GetOpenTxnsInfoResponse txnresp = msClient.showTxns();
+    ShowLocksResponse response = msClient.showLocks();
+    Assert.assertEquals("Wrong nubmer of locks: " + response, 1, response.getLocks().size());
+    ShowLocksResponseElement lock = response.getLocks().get(0);
+    long acquiredAt = lock.getAcquiredat();
+    long heartbeatAt = lock.getAcquiredat();
+    txnBatch.heartbeat();
+    response = msClient.showLocks();
+    Assert.assertEquals("Wrong number of locks2: " + response, 1, response.getLocks().size());
+    lock = response.getLocks().get(0);
+    Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, lock.getAcquiredat());
+    Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() +
+      ") > old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() > heartbeatAt);
+  }
   @Test
   public void testTransactionBatchEmptyAbort() throws Exception {
     // 1) to partitioned table

http://git-wip-us.apache.org/repos/asf/hive/blob/7bf681de/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 82f0dc0..b685f3d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1757,6 +1757,10 @@ public class TxnHandler {
         " where txn_id = " + txnid;
       LOG.debug("Going to execute update <" + s + ">");
       stmt.executeUpdate(s);
+      //update locks for this txn to the same heartbeat
+      s = "update HIVE_LOCKS set hl_last_heartbeat = " + now + " where hl_txnid = " + txnid;
+      LOG.debug("Going to execute update <" + s + ">");
+      stmt.executeUpdate(s);
       LOG.debug("Going to commit");
       dbConn.commit();
     } finally {

http://git-wip-us.apache.org/repos/asf/hive/blob/7bf681de/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 39b44e8..219a54a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -293,17 +293,28 @@ public class DbTxnManager extends HiveTxnManagerImpl {
 
   @Override
   public void heartbeat() throws LockException {
-    LOG.debug("Heartbeating lock and transaction " + JavaUtils.txnIdToString(txnId));
-    List<HiveLock> locks = lockMgr.getLocks(false, false);
-    if (locks.size() == 0) {
-      if (!isTxnOpen()) {
-        // No locks, no txn, we outta here.
-        return;
-      } else {
-        // Create one dummy lock so we can go through the loop below
-        DbLockManager.DbHiveLock dummyLock = new DbLockManager.DbHiveLock(0L);
-        locks.add(dummyLock);
+    List<HiveLock> locks;
+    if(isTxnOpen()) {
+      // Create one dummy lock so we can go through the loop below, though we only
+      //really need txnId
+      DbLockManager.DbHiveLock dummyLock = new DbLockManager.DbHiveLock(0L);
+      locks = new ArrayList<>(1);
+      locks.add(dummyLock);
+    }
+    else {
+      locks = lockMgr.getLocks(false, false);
+    }
+    if(LOG.isInfoEnabled()) {
+      StringBuilder sb = new StringBuilder("Sending heartbeat for ")
+        .append(JavaUtils.txnIdToString(txnId)).append(" and");
+      for(HiveLock lock : locks) {
+        sb.append(" ").append(lock.toString());
       }
+      LOG.info(sb.toString());
+    }
+    if(!isTxnOpen() && locks.isEmpty()) {
+      // No locks, no txn, we outta here.
+      return;
     }
     for (HiveLock lock : locks) {
       long lockId = ((DbLockManager.DbHiveLock)lock).lockId;
@@ -320,7 +331,8 @@ public class DbTxnManager extends HiveTxnManagerImpl {
         throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId));
       } catch (TException e) {
         throw new LockException(
-            ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+            ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg() + "(" + JavaUtils.txnIdToString(txnId)
+              + "," + lock.toString() + ")", e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/7bf681de/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 7bc01d9..a45536e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -399,10 +399,10 @@ public class CompactorMR {
             dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) {
           boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX);
           FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter);
-          for (int j = 0; j < files.length; j++) {
+          for(FileStatus f : files) {
             // For each file, figure out which bucket it is.
-            Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(files[j].getPath().getName());
-            addFileToMap(matcher, files[j].getPath(), sawBase, splitToBucketMap);
+            Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName());
+            addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap);
           }
         } else {
           // Legacy file, see if it's a bucket file
@@ -431,7 +431,7 @@ public class CompactorMR {
                               Map<Integer, BucketTracker> splitToBucketMap) {
       if (!matcher.find()) {
         LOG.warn("Found a non-bucket file that we thought matched the bucket pattern! " +
-            file.toString());
+            file.toString() + " Matcher=" + matcher.toString());
       }
       int bucketNum = Integer.valueOf(matcher.group());
       BucketTracker bt = splitToBucketMap.get(bucketNum);

http://git-wip-us.apache.org/repos/asf/hive/blob/7bf681de/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index db119e1..8a53ec5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -236,7 +236,7 @@ public class TestDbTxnManager {
       exception = ex;
     }
     Assert.assertNotNull("Expected exception3", exception);
-    Assert.assertEquals("Wrong Exception3", ErrorMsg.LOCK_NO_SUCH_LOCK, exception.getCanonicalErrorMsg());
+    Assert.assertEquals("Wrong Exception3", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
   }
 
   @Test


Mime
View raw message