hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jxi...@apache.org
Subject hive git commit: HIVE-10956: HS2 leaks HMS connections (Jimmy, reviewed by Xuefu)
Date Thu, 11 Jun 2015 15:51:28 GMT
Repository: hive
Updated Branches:
  refs/heads/master 9f263fcdc -> ac49574f2


HIVE-10956: HS2 leaks HMS connections (Jimmy, reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ac49574f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ac49574f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ac49574f

Branch: refs/heads/master
Commit: ac49574f278d3b23b44c9fde74f89cc885d780cb
Parents: 9f263fc
Author: Jimmy Xiang <jxiang@cloudera.com>
Authored: Thu Jun 4 14:52:40 2015 -0700
Committer: Jimmy Xiang <jxiang@cloudera.com>
Committed: Thu Jun 11 08:45:58 2015 -0700

----------------------------------------------------------------------
 .../hive/metastore/HiveMetaStoreClient.java     | 12 +++++----
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  8 +++++-
 .../service/cli/operation/SQLOperation.java     | 16 +----------
 .../hive/service/cli/session/HiveSession.java   | 12 ++++++++-
 .../service/cli/session/HiveSessionImpl.java    | 28 +++++++++++++++++---
 .../cli/session/HiveSessionImplwithUGI.java     | 21 ---------------
 .../service/cli/thrift/ThriftCLIService.java    |  4 +++
 7 files changed, 54 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ac49574f/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 4891d42..a5f5053 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.security.auth.login.LoginException;
 
@@ -170,6 +171,8 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
 
   private Map<String, String> currentMetaVars;
 
+  private static final AtomicInteger connCount = new AtomicInteger(0);
+
   // for thrift connects
   private int retries = 5;
   private long retryDelaySeconds = 0;
@@ -419,6 +422,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
           client = new ThriftHiveMetastore.Client(protocol);
           try {
             transport.open();
+            LOG.info("Opened a connection to metastore, current connections: " + connCount.incrementAndGet());
             isConnected = true;
           } catch (TTransportException e) {
             tte = e;
@@ -499,6 +503,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
     // just in case, we make this call.
     if ((transport != null) && transport.isOpen()) {
       transport.close();
+      LOG.info("Closed a connection to metastore, current connections: " + connCount.decrementAndGet());
     }
   }
 
@@ -1974,19 +1979,16 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
 
   private static class SynchronizedHandler implements InvocationHandler {
     private final IMetaStoreClient client;
-    private static final Object lock = SynchronizedHandler.class;
 
     SynchronizedHandler(IMetaStoreClient client) {
       this.client = client;
     }
 
     @Override
-    public Object invoke(Object proxy, Method method, Object [] args)
+    public synchronized Object invoke(Object proxy, Method method, Object [] args)
         throws Throwable {
       try {
-        synchronized (lock) {
-          return method.invoke(client, args);
-        }
+        return method.invoke(client, args);
       } catch (InvocationTargetException e) {
         throw e.getTargetException();
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/ac49574f/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 8c948a9..0e990f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.HiveMetaException;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.PartitionDropOptions;
@@ -3009,7 +3010,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
    */
   @LimitedPrivate(value = {"Hive"})
   @Unstable
-  public IMetaStoreClient getMSC() throws MetaException {
+  public synchronized IMetaStoreClient getMSC() throws MetaException {
     if (metaStoreClient == null) {
       try {
         owner = UserGroupInformation.getCurrentUser();
@@ -3019,6 +3020,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
         throw new MetaException(msg + "\n" + StringUtils.stringifyException(e));
       }
       metaStoreClient = createMetaStoreClient();
+      String metaStoreUris = conf.getVar(HiveConf.ConfVars.METASTOREURIS);
+      if (!org.apache.commons.lang3.StringUtils.isEmpty(metaStoreUris)) {
+        // get a synchronized wrapper if the meta store is remote.
+        metaStoreClient = HiveMetaStoreClient.newSynchronizedClient(metaStoreClient);
+      }
     }
     return metaStoreClient;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ac49574f/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 33ee16b..cc9df76 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.exec.ExplainTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.VariableSubstitution;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.OperationLog;
@@ -186,7 +185,7 @@ public class SQLOperation extends ExecuteStatementOperation {
       final SessionState parentSessionState = SessionState.get();
       // ThreadLocal Hive object needs to be set in background thread.
       // The metastore client in Hive is associated with right user.
-      final Hive parentHive = getSessionHive();
+      final Hive parentHive = parentSession.getSessionHive();
       // Current UGI will get used by metastore when metsatore is in embedded mode
       // So this needs to get passed to the new background thread
       final UserGroupInformation currentUGI = getCurrentUGI(opConfig);
@@ -261,19 +260,6 @@ public class SQLOperation extends ExecuteStatementOperation {
     }
   }
 
-  /**
-   * Returns the ThreadLocal Hive for the current thread
-   * @return Hive
-   * @throws HiveSQLException
-   */
-  private Hive getSessionHive() throws HiveSQLException {
-    try {
-      return Hive.get();
-    } catch (HiveException e) {
-      throw new HiveSQLException("Failed to get ThreadLocal Hive object", e);
-    }
-  }
-
   private void registerCurrentOperationLog() {
     if (isOperationLogEnabled) {
       if (operationLog == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ac49574f/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
index 65f9b29..4f4e92d 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
@@ -22,8 +22,16 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hive.service.auth.HiveAuthFactory;
-import org.apache.hive.service.cli.*;
+import org.apache.hive.service.cli.FetchOrientation;
+import org.apache.hive.service.cli.FetchType;
+import org.apache.hive.service.cli.GetInfoType;
+import org.apache.hive.service.cli.GetInfoValue;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.TableSchema;
 
 public interface HiveSession extends HiveSessionBase {
 
@@ -31,6 +39,8 @@ public interface HiveSession extends HiveSessionBase {
 
   IMetaStoreClient getMetaStoreClient() throws HiveSQLException;
 
+  Hive getSessionHive()  throws HiveSQLException;
+
   /**
    * getInfo operation handler
    * @param getInfoType

http://git-wip-us.apache.org/repos/asf/hive/blob/ac49574f/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 343c68e..a2fae69 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -90,6 +90,9 @@ public class HiveSessionImpl implements HiveSession {
   private final Set<OperationHandle> opHandleSet = new HashSet<OperationHandle>();
   private boolean isOperationLogEnabled;
   private File sessionLogDir;
+
+  private Hive sessionHive;
+
   private volatile long lastAccessTime;
   private volatile long lastIdleTime;
 
@@ -142,6 +145,11 @@ public class HiveSessionImpl implements HiveSession {
       LOG.error(msg, e);
       throw new HiveSQLException(msg, e);
     }
+    try {
+      sessionHive = Hive.get(getHiveConf());
+    } catch (HiveException e) {
+      throw new HiveSQLException("Failed to get metastore connection", e);
+    }
     // Process global init file: .hiverc
     processGlobalInitFile();
     if (sessionConfMap != null) {
@@ -278,6 +286,7 @@ public class HiveSessionImpl implements HiveSession {
     if (userAccess) {
       lastAccessTime = System.currentTimeMillis();
     }
+    Hive.set(sessionHive);
   }
 
   /**
@@ -326,13 +335,16 @@ public class HiveSessionImpl implements HiveSession {
   }
 
   @Override
+  public Hive getSessionHive() {
+    return sessionHive;
+  }
+
+  @Override
   public IMetaStoreClient getMetaStoreClient() throws HiveSQLException {
     try {
-      return Hive.get(getHiveConf()).getMSC();
-    } catch (HiveException e) {
-      throw new HiveSQLException("Failed to get metastore connection", e);
+      return getSessionHive().getMSC();
     } catch (MetaException e) {
-      throw new HiveSQLException("Failed to get metastore connection", e);
+      throw new HiveSQLException("Error acquiring metastore connection", e);
     }
   }
 
@@ -579,6 +591,14 @@ public class HiveSessionImpl implements HiveSession {
         }
         sessionState = null;
       }
+      if (sessionHive != null) {
+        try {
+          Hive.closeCurrent();
+        } catch (Throwable t) {
+          LOG.warn("Error closing sessionHive", t);
+        }
+        sessionHive = null;
+      }
       release(true);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ac49574f/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
index a29e5d1..56af643 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.service.auth.HiveAuthFactory;
@@ -43,7 +42,6 @@ public class HiveSessionImplwithUGI extends HiveSessionImpl {
 
   private UserGroupInformation sessionUgi = null;
   private String delegationTokenStr = null;
-  private Hive sessionHive = null;
   private HiveSession proxySession = null;
   static final Log LOG = LogFactory.getLog(HiveSessionImplwithUGI.class);
 
@@ -52,14 +50,6 @@ public class HiveSessionImplwithUGI extends HiveSessionImpl {
     super(protocol, username, password, hiveConf, ipAddress);
     setSessionUGI(username);
     setDelegationToken(delegationToken);
-
-    // create a new metastore connection for this particular user session
-    Hive.set(null);
-    try {
-      sessionHive = Hive.get(getHiveConf());
-    } catch (HiveException e) {
-      throw new HiveSQLException("Failed to setup metastore connection", e);
-    }
   }
 
   // setup appropriate UGI for the session
@@ -87,15 +77,6 @@ public class HiveSessionImplwithUGI extends HiveSessionImpl {
     return this.delegationTokenStr;
   }
 
-  @Override
-  protected synchronized void acquire(boolean userAccess) {
-    super.acquire(userAccess);
-    // if we have a metastore connection with impersonation, then set it first
-    if (sessionHive != null) {
-      Hive.set(sessionHive);
-    }
-  }
-
   /**
    * Close the file systems for the session and remove it from the FileSystem cache.
    * Cancel the session's delegation token and close the metastore connection
@@ -146,8 +127,6 @@ public class HiveSessionImplwithUGI extends HiveSessionImpl {
       } catch (HiveException e) {
         throw new HiveSQLException("Couldn't cancel delegation token", e);
       }
-      // close the metastore connection created with this delegation token
-      Hive.closeCurrent();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ac49574f/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 5a0f1c8..dfb7faa 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -24,6 +24,7 @@ import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.security.auth.login.LoginException;
 
@@ -67,6 +68,7 @@ public abstract class ThriftCLIService extends AbstractService implements
TCLISe
   protected CLIService cliService;
   private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS);
   protected static HiveAuthFactory hiveAuthFactory;
+  private static final AtomicInteger sessionCount = new AtomicInteger();
 
   protected int portNum;
   protected InetAddress serverIPAddress;
@@ -304,6 +306,7 @@ public abstract class ThriftCLIService extends AbstractService implements
TCLISe
       if (context != null) {
         context.setSessionHandle(sessionHandle);
       }
+      LOG.info("Opened a session, current sessions: " + sessionCount.incrementAndGet());
     } catch (Exception e) {
       LOG.warn("Error opening session: ", e);
       resp.setStatus(HiveSQLException.toTStatus(e));
@@ -446,6 +449,7 @@ public abstract class ThriftCLIService extends AbstractService implements
TCLISe
     try {
       SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
       cliService.closeSession(sessionHandle);
+      LOG.info("Closed a session, current sessions: " + sessionCount.decrementAndGet());
       resp.setStatus(OK_STATUS);
       ThriftCLIServerContext context =
         (ThriftCLIServerContext)currentServerContext.get();


Mime
View raw message