hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ekoif...@apache.org
Subject hive git commit: HIVE-15950 Make DbTxnManager use Metastore client consistently with callers (Eugene Koifman, reviewed by Vaibhav Gumashta)
Date Fri, 17 Feb 2017 17:32:20 GMT
Repository: hive
Updated Branches:
  refs/heads/master 3485d02cb -> bb4d8db50


HIVE-15950 Make DbTxnManager use Metastore client consistently with callers (Eugene Koifman,
reviewed by Vaibhav Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bb4d8db5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bb4d8db5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bb4d8db5

Branch: refs/heads/master
Commit: bb4d8db5093984b94d3bb996e286e10a0dc2bef3
Parents: 3485d02
Author: Eugene Koifman <ekoifman@hortonworks.com>
Authored: Fri Feb 17 09:32:15 2017 -0800
Committer: Eugene Koifman <ekoifman@hortonworks.com>
Committed: Fri Feb 17 09:32:15 2017 -0800

----------------------------------------------------------------------
 .../hive/metastore/RetryingMetaStoreClient.java |   4 +-
 .../hadoop/hive/ql/lockmgr/DbLockManager.java   |  28 ++--
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    | 144 +++++++------------
 3 files changed, 68 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bb4d8db5/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
b/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
index a6545a9..d3e5f7e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java
@@ -215,8 +215,8 @@ public class RetryingMetaStoreClient implements InvocationHandler {
         throw caughtException;
       }
       retriesMade++;
-      LOG.warn("MetaStoreClient lost connection. Attempting to reconnect.",
-          caughtException);
+      LOG.warn("MetaStoreClient lost connection. Attempting to reconnect (" + retriesMade
+ " of " +
+          retryLimit + ") after " + retryDelaySeconds + "s. " + method.getName(), caughtException);
       Thread.sleep(retryDelaySeconds * 1000);
     }
     return ret;

http://git-wip-us.apache.org/repos/asf/hive/blob/bb4d8db5/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index 529e64c..c3725ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.lockmgr;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient;
 import org.apache.hadoop.hive.ql.exec.DDLTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,10 +42,10 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * An implementation of HiveLockManager for use with {@link org.apache.hadoop.hive.ql.lockmgr.DbTxnManager}.
- * Note, this lock manager is not meant to stand alone.  It cannot be used
- * without the DbTxnManager.
+ * Note, this lock manager is not meant to be stand alone.  It cannot be used without the
DbTxnManager.
+ * See {@link DbTxnManager#getMS()} for important concurrency/metastore access notes.
  */
-public class DbLockManager implements HiveLockManager{
+public final class DbLockManager implements HiveLockManager{
 
   static final private String CLASS_NAME = DbLockManager.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
@@ -54,14 +53,14 @@ public class DbLockManager implements HiveLockManager{
   private long MAX_SLEEP;
   //longer term we should always have a txn id and then we won't need to track locks here
at all
   private Set<DbHiveLock> locks;
-  private SynchronizedMetaStoreClient client;
   private long nextSleep = 50;
   private final HiveConf conf;
+  private final DbTxnManager txnManager;
 
-  DbLockManager(SynchronizedMetaStoreClient client, HiveConf conf) {
+  DbLockManager(HiveConf conf, DbTxnManager txnManager) {
     locks = new HashSet<>();
-    this.client = client;
     this.conf = conf;
+    this.txnManager = txnManager;
   }
 
   @Override
@@ -100,7 +99,7 @@ public class DbLockManager implements HiveLockManager{
     int maxNumWaits = Math.max(0, conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES));
     try {
       LOG.info("Requesting: queryId=" + queryId + " " + lock);
-      LockResponse res = client.lock(lock);
+      LockResponse res = txnManager.getMS().lock(lock);
       //link lockId to queryId
       LOG.info("Response to queryId=" + queryId + " " + res);
       if(!isBlocking) {
@@ -112,8 +111,7 @@ public class DbLockManager implements HiveLockManager{
       long startRetry = System.currentTimeMillis();
       while (res.getState() == LockState.WAITING && numRetries++ < maxNumWaits)
{
         backoff();
-        res = client.checkLock(res.getLockid());
-
+        res = txnManager.getMS().checkLock(res.getLockid());
       }
       long retryDuration = System.currentTimeMillis() - startRetry;
       DbHiveLock hl = new DbHiveLock(res.getLockid(), queryId, lock.getTxnid());
@@ -203,7 +201,7 @@ public class DbLockManager implements HiveLockManager{
    */
   LockState checkLock(long extLockId) throws LockException {
     try {
-      return client.checkLock(extLockId).getState();
+      return txnManager.getMS().checkLock(extLockId).getState();
     } catch (TException e) {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
         e);
@@ -216,7 +214,7 @@ public class DbLockManager implements HiveLockManager{
     boolean removed = false;
     try {
       LOG.debug("Unlocking " + hiveLock);
-      client.unlock(lockId);
+      txnManager.getMS().unlock(lockId);
       //important to remove after unlock() in case it fails
       removed = locks.remove(hiveLock);
       Metrics metrics = MetricsFactory.getInstance();
@@ -283,7 +281,7 @@ public class DbLockManager implements HiveLockManager{
 
   public ShowLocksResponse getLocks(ShowLocksRequest showLocksRequest) throws LockException
{
     try {
-      return client.showLocks(showLocksRequest);
+      return txnManager.getMS().showLocks(showLocksRequest);
     } catch (TException e) {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
     }
@@ -354,8 +352,8 @@ public class DbLockManager implements HiveLockManager{
   /**
    * Clear the memory of the locks in this object.  This won't clear the locks from the database.
    * It is for use with
-   * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient, org.apache.hadoop.hive.conf.HiveConf)}
.commitTxn} and
-   * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient, org.apache.hadoop.hive.conf.HiveConf)}
.rollbackTxn}.
+   * {@link #DbLockManager(HiveConf, DbTxnManager)} .commitTxn} and
+   * {@link #DbLockManager(HiveConf, DbTxnManager)} .rollbackTxn}.
    */
   void clearLocalLockRecords() {
     locks.clear();

http://git-wip-us.apache.org/repos/asf/hive/blob/bb4d8db5/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index a985eb1..62f7c5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
@@ -52,21 +52,23 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * An implementation of HiveTxnManager that stores the transactions in the
- * metastore database.
+ * An implementation of HiveTxnManager that stores the transactions in the metastore database.
+ * There should be 1 instance o {@link DbTxnManager} per {@link org.apache.hadoop.hive.ql.session.SessionState}
+ * with a single thread accessing it at a time, with the exception of {@link #heartbeat()}
method.
+ * The later may (usually will) be called from a timer thread.
+ * See {@link #getMS()} for more important concurrency/metastore access notes.
  */
-public class DbTxnManager extends HiveTxnManagerImpl {
+public final class DbTxnManager extends HiveTxnManagerImpl {
 
   static final private String CLASS_NAME = DbTxnManager.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
-  private DbLockManager lockMgr = null;
-  private SynchronizedMetaStoreClient client = null;
+  private volatile DbLockManager lockMgr = null;
   /**
    * The Metastore NEXT_TXN_ID.NTXN_NEXT is initialized to 1; it contains the next available
    * transaction id.  Thus is 1 is first transaction id.
    */
-  private long txnId = 0;
+  private volatile long txnId = 0;
   /**
    * assigns a unique monotonically increasing ID to each statement
    * which is part of an open transaction.  This is used by storage
@@ -84,33 +86,31 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   private ScheduledFuture<?> heartbeatTask = null;
   private Runnable shutdownRunner = null;
   private static final int SHUTDOWN_HOOK_PRIORITY = 0;
-
-  // SynchronizedMetaStoreClient object per heartbeater thread.
-  private static ThreadLocal<SynchronizedMetaStoreClient> threadLocalMSClient =
-      new ThreadLocal<SynchronizedMetaStoreClient>() {
-
-        @Override
-        protected SynchronizedMetaStoreClient initialValue() {
-          return null;
-        }
-
-        @Override
-        public synchronized void remove() {
-          SynchronizedMetaStoreClient client = this.get();
-          if (client != null) {
-            client.close();
-          }
-          super.remove();
-        }
-      };
-
-  private static AtomicInteger heartbeaterMSClientCount = new AtomicInteger(0);
-  private static int heartbeaterThreadPoolSize = 0;
-
-  private static SynchronizedMetaStoreClient getThreadLocalMSClient() {
-    return threadLocalMSClient.get();
+  /**
+   * We do this on every call to make sure TM uses same MS connection as is used by the caller
(Driver,
+   * SemanticAnalyzer, etc).  {@code Hive} instances are cached using ThreadLocal and
+   * {@link IMetaStoreClient} is cached within {@code Hive} with additional logic.  Futhermore,
this
+   * ensures that multiple threads are not sharing the same Thrift client (which could happen
+   * if we had cached {@link IMetaStoreClient} here.
+   *
+   * ThreadLocal gets cleaned up automatically when its thread goes away
+   * https://docs.oracle.com/javase/7/docs/api/java/lang/ThreadLocal.html.  This is especially
+   * important for threads created by {@link #heartbeatExecutorService} threads.
+   *
+   * Embedded {@link DbLockManager} follows the same logic.
+   * @return IMetaStoreClient
+   * @throws LockException on any errors
+   */
+  IMetaStoreClient getMS() throws LockException {
+    try {
+      return Hive.get(conf).getMSC();
+    }
+    catch(HiveException|MetaException e) {
+      String msg = "Unable to reach Hive Metastore: " + e.getMessage();
+      LOG.error(msg, e);
+      throw new LockException(e);
+    }
   }
-
   DbTxnManager() {
     shutdownRunner = new Runnable() {
       @Override
@@ -148,7 +148,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
       throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId));
     }
     try {
-      txnId = client.openTxn(user);
+      txnId = getMS().openTxn(user);
       statementId = 0;
       LOG.debug("Opened " + JavaUtils.txnIdToString(txnId));
       ctx.setHeartbeater(startHeartbeat(delay));
@@ -158,11 +158,15 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     }
   }
 
+  /**
+   * we don't expect multiple thread to call this method concurrently but {@link #lockMgr}
will
+   * be read by a different threads that one writing it, thus it's {@code volatile}
+   */
   @Override
   public HiveLockManager getLockManager() throws LockException {
     init();
     if (lockMgr == null) {
-      lockMgr = new DbLockManager(client, conf);
+      lockMgr = new DbLockManager(conf, this);
     }
     return lockMgr;
   }
@@ -388,7 +392,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
       lockMgr.clearLocalLockRecords();
       stopHeartbeat();
       LOG.debug("Committing txn " + JavaUtils.txnIdToString(txnId));
-      client.commitTxn(txnId);
+      getMS().commitTxn(txnId);
     } catch (NoSuchTxnException e) {
       LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
       throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
@@ -414,7 +418,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
       lockMgr.clearLocalLockRecords();
       stopHeartbeat();
       LOG.debug("Rolling back " + JavaUtils.txnIdToString(txnId));
-      client.rollbackTxn(txnId);
+      getMS().rollbackTxn(txnId);
     } catch (NoSuchTxnException e) {
       LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
       throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
@@ -460,29 +464,11 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     for (HiveLock lock : locks) {
       long lockId = ((DbLockManager.DbHiveLock)lock).lockId;
       try {
-        // Get the threadlocal metastore client for the heartbeat calls.
-        SynchronizedMetaStoreClient heartbeaterClient = getThreadLocalMSClient();
-        if (heartbeaterClient == null) {
-          Hive db;
-          try {
-            db = Hive.get(conf);
-            // Create a new threadlocal synchronized metastore client for use in heartbeater
threads.
-            // This makes the concurrent use of heartbeat thread safe, and won't cause transaction
-            // abort due to a long metastore client call blocking the heartbeat call.
-            heartbeaterClient = new SynchronizedMetaStoreClient(db.getMSC());
-            threadLocalMSClient.set(heartbeaterClient);
-          } catch (HiveException e) {
-            LOG.error("Unable to create new metastore client for heartbeating", e);
-            throw new LockException(e);
-          }
-          // Increment the threadlocal metastore client count
-          if (heartbeaterMSClientCount.incrementAndGet() >= heartbeaterThreadPoolSize)
{
-            LOG.warn("The number of heartbeater metastore clients - + "
-                + heartbeaterMSClientCount.get() + ", has exceeded the max limit - "
-                + heartbeaterThreadPoolSize);
-          }
-        }
-        heartbeaterClient.heartbeat(txnId, lockId);
+        /**
+         * This relies on the ThreadLocal caching, which implies that the same {@link IMetaStoreClient},
+         * in particular the Thrift connection it uses is never shared between threads
+         */
+        getMS().heartbeat(txnId, lockId);
       } catch (NoSuchLockException e) {
         LOG.error("Unable to find lock " + JavaUtils.lockIdToString(lockId));
         throw new LockException(e, ErrorMsg.LOCK_NO_SUCH_LOCK, JavaUtils.lockIdToString(lockId));
@@ -554,7 +540,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   public ValidTxnList getValidTxns() throws LockException {
     init();
     try {
-      return client.getValidTxns(txnId);
+      return getMS().getValidTxns(txnId);
     } catch (TException e) {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
           e);
@@ -598,21 +584,10 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   }
 
   private void init() throws LockException {
-    if (client == null) {
-      if (conf == null) {
-        throw new RuntimeException("Must call setHiveConf before any other " +
-            "methods.");
-      }
-      try {
-        Hive db = Hive.get(conf);
-        client = new SynchronizedMetaStoreClient(db.getMSC());
-        initHeartbeatExecutorService();
-      } catch (MetaException e) {
-        throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e);
-      } catch (HiveException e) {
-        throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e);
-      }
+    if (conf == null) {
+      throw new RuntimeException("Must call setHiveConf before any other methods.");
     }
+    initHeartbeatExecutorService();
   }
 
   private synchronized void initHeartbeatExecutorService() {
@@ -620,10 +595,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
         && !heartbeatExecutorService.isTerminated()) {
       return;
     }
-    heartbeaterThreadPoolSize =
-        conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE);
     heartbeatExecutorService =
-        Executors.newScheduledThreadPool(heartbeaterThreadPoolSize, new ThreadFactory() {
+        Executors.newScheduledThreadPool(
+          conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE), new ThreadFactory()
{
           private final AtomicInteger threadCounter = new AtomicInteger();
 
           @Override
@@ -635,22 +609,10 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   }
 
   public static class HeartbeaterThread extends Thread {
-    public HeartbeaterThread(Runnable target, String name) {
+    HeartbeaterThread(Runnable target, String name) {
       super(target, name);
       setDaemon(true);
     }
-
-    @Override
-    /**
-     * We're overriding finalize so that we can do an orderly cleanup of resources held by
-     * the threadlocal metastore client.
-     */
-    protected void finalize() throws Throwable {
-      threadLocalMSClient.remove();
-      // Adjust the metastore client count
-      DbTxnManager.heartbeaterMSClientCount.decrementAndGet();
-      super.finalize();
-    }
   }
 
   @Override


Mime
View raw message