Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 90FD5181DE for ; Thu, 11 Jun 2015 15:56:46 +0000 (UTC) Received: (qmail 84123 invoked by uid 500); 11 Jun 2015 15:56:46 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 84076 invoked by uid 500); 11 Jun 2015 15:56:46 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 84064 invoked by uid 99); 11 Jun 2015 15:56:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jun 2015 15:56:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2DA69DFFD5; Thu, 11 Jun 2015 15:56:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jxiang@apache.org To: commits@hive.apache.org Message-Id: <9fab10e629094966ad116a73c2cbdbd5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-10956: HS2 leaks HMS connections (Jimmy, reviewed by Xuefu) Date: Thu, 11 Jun 2015 15:56:46 +0000 (UTC) Repository: hive Updated Branches: refs/heads/branch-1 d32dc3cc7 -> 3bf83292a HIVE-10956: HS2 leaks HMS connections (Jimmy, reviewed by Xuefu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3bf83292 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3bf83292 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3bf83292 Branch: refs/heads/branch-1 Commit: 3bf83292a16aea3fcc5b7c9c9c2d4d4ed06d8fe9 Parents: d32dc3c Author: Jimmy Xiang Authored: Thu Jun 4 14:52:40 2015 -0700 Committer: Jimmy Xiang Committed: Thu Jun 11 08:52:06 2015 -0700 ---------------------------------------------------------------------- .../hive/metastore/HiveMetaStoreClient.java | 12 +++++---- .../apache/hadoop/hive/ql/metadata/Hive.java | 8 +++++- .../service/cli/operation/SQLOperation.java | 16 +---------- .../hive/service/cli/session/HiveSession.java | 12 ++++++++- .../service/cli/session/HiveSessionImpl.java | 28 +++++++++++++++++--- .../cli/session/HiveSessionImplwithUGI.java | 21 --------------- .../service/cli/thrift/ThriftCLIService.java | 4 +++ 7 files changed, 54 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/3bf83292/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 4891d42..a5f5053 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.security.auth.login.LoginException; @@ -170,6 +171,8 @@ public class HiveMetaStoreClient implements IMetaStoreClient { private Map currentMetaVars; + private static final AtomicInteger connCount = new AtomicInteger(0); + // for thrift connects private int retries = 5; private long retryDelaySeconds = 0; @@ -419,6 +422,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient { client = new ThriftHiveMetastore.Client(protocol); try { transport.open(); + LOG.info("Opened a connection to metastore, current connections: " + connCount.incrementAndGet()); isConnected = true; } catch (TTransportException e) { tte = e; @@ -499,6 +503,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient { // just in case, we make this call. if ((transport != null) && transport.isOpen()) { transport.close(); + LOG.info("Closed a connection to metastore, current connections: " + connCount.decrementAndGet()); } } @@ -1974,19 +1979,16 @@ public class HiveMetaStoreClient implements IMetaStoreClient { private static class SynchronizedHandler implements InvocationHandler { private final IMetaStoreClient client; - private static final Object lock = SynchronizedHandler.class; SynchronizedHandler(IMetaStoreClient client) { this.client = client; } @Override - public Object invoke(Object proxy, Method method, Object [] args) + public synchronized Object invoke(Object proxy, Method method, Object [] args) throws Throwable { try { - synchronized (lock) { - return method.invoke(client, args); - } + return method.invoke(client, args); } catch (InvocationTargetException e) { throw e.getTargetException(); } http://git-wip-us.apache.org/repos/asf/hive/blob/3bf83292/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 8c948a9..0e990f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaException; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.PartitionDropOptions; @@ -3009,7 +3010,7 @@ private void constructOneLBLocationMap(FileStatus fSta, */ @LimitedPrivate(value = {"Hive"}) @Unstable - public IMetaStoreClient getMSC() throws MetaException { + public synchronized IMetaStoreClient getMSC() throws MetaException { if (metaStoreClient == null) { try { owner = UserGroupInformation.getCurrentUser(); @@ -3019,6 +3020,11 @@ private void constructOneLBLocationMap(FileStatus fSta, throw new MetaException(msg + "\n" + StringUtils.stringifyException(e)); } metaStoreClient = createMetaStoreClient(); + String metaStoreUris = conf.getVar(HiveConf.ConfVars.METASTOREURIS); + if (!org.apache.commons.lang3.StringUtils.isEmpty(metaStoreUris)) { + // get a synchronized wrapper if the meta store is remote. + metaStoreClient = HiveMetaStoreClient.newSynchronizedClient(metaStoreClient); + } } return metaStoreClient; } http://git-wip-us.apache.org/repos/asf/hive/blob/3bf83292/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 33ee16b..cc9df76 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.VariableSubstitution; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.OperationLog; @@ -186,7 +185,7 @@ public class SQLOperation extends ExecuteStatementOperation { final SessionState parentSessionState = SessionState.get(); // ThreadLocal Hive object needs to be set in background thread. // The metastore client in Hive is associated with right user. - final Hive parentHive = getSessionHive(); + final Hive parentHive = parentSession.getSessionHive(); // Current UGI will get used by metastore when metsatore is in embedded mode // So this needs to get passed to the new background thread final UserGroupInformation currentUGI = getCurrentUGI(opConfig); @@ -261,19 +260,6 @@ public class SQLOperation extends ExecuteStatementOperation { } } - /** - * Returns the ThreadLocal Hive for the current thread - * @return Hive - * @throws HiveSQLException - */ - private Hive getSessionHive() throws HiveSQLException { - try { - return Hive.get(); - } catch (HiveException e) { - throw new HiveSQLException("Failed to get ThreadLocal Hive object", e); - } - } - private void registerCurrentOperationLog() { if (isOperationLogEnabled) { if (operationLog == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/3bf83292/service/src/java/org/apache/hive/service/cli/session/HiveSession.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java index 65f9b29..4f4e92d 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -22,8 +22,16 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hive.service.auth.HiveAuthFactory; -import org.apache.hive.service.cli.*; +import org.apache.hive.service.cli.FetchOrientation; +import org.apache.hive.service.cli.FetchType; +import org.apache.hive.service.cli.GetInfoType; +import org.apache.hive.service.cli.GetInfoValue; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.OperationHandle; +import org.apache.hive.service.cli.RowSet; +import org.apache.hive.service.cli.TableSchema; public interface HiveSession extends HiveSessionBase { @@ -31,6 +39,8 @@ public interface HiveSession extends HiveSessionBase { IMetaStoreClient getMetaStoreClient() throws HiveSQLException; + Hive getSessionHive() throws HiveSQLException; + /** * getInfo operation handler * @param getInfoType http://git-wip-us.apache.org/repos/asf/hive/blob/3bf83292/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 343c68e..a2fae69 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -90,6 +90,9 @@ public class HiveSessionImpl implements HiveSession { private final Set opHandleSet = new HashSet(); private boolean isOperationLogEnabled; private File sessionLogDir; + + private Hive sessionHive; + private volatile long lastAccessTime; private volatile long lastIdleTime; @@ -142,6 +145,11 @@ public class HiveSessionImpl implements HiveSession { LOG.error(msg, e); throw new HiveSQLException(msg, e); } + try { + sessionHive = Hive.get(getHiveConf()); + } catch (HiveException e) { + throw new HiveSQLException("Failed to get metastore connection", e); + } // Process global init file: .hiverc processGlobalInitFile(); if (sessionConfMap != null) { @@ -278,6 +286,7 @@ public class HiveSessionImpl implements HiveSession { if (userAccess) { lastAccessTime = System.currentTimeMillis(); } + Hive.set(sessionHive); } /** @@ -326,13 +335,16 @@ public class HiveSessionImpl implements HiveSession { } @Override + public Hive getSessionHive() { + return sessionHive; + } + + @Override public IMetaStoreClient getMetaStoreClient() throws HiveSQLException { try { - return Hive.get(getHiveConf()).getMSC(); - } catch (HiveException e) { - throw new HiveSQLException("Failed to get metastore connection", e); + return getSessionHive().getMSC(); } catch (MetaException e) { - throw new HiveSQLException("Failed to get metastore connection", e); + throw new HiveSQLException("Error acquiring metastore connection", e); } } @@ -579,6 +591,14 @@ public class HiveSessionImpl implements HiveSession { } sessionState = null; } + if (sessionHive != null) { + try { + Hive.closeCurrent(); + } catch (Throwable t) { + LOG.warn("Error closing sessionHive", t); + } + sessionHive = null; + } release(true); } } http://git-wip-us.apache.org/repos/asf/hive/blob/3bf83292/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java index a29e5d1..56af643 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.service.auth.HiveAuthFactory; @@ -43,7 +42,6 @@ public class HiveSessionImplwithUGI extends HiveSessionImpl { private UserGroupInformation sessionUgi = null; private String delegationTokenStr = null; - private Hive sessionHive = null; private HiveSession proxySession = null; static final Log LOG = LogFactory.getLog(HiveSessionImplwithUGI.class); @@ -52,14 +50,6 @@ public class HiveSessionImplwithUGI extends HiveSessionImpl { super(protocol, username, password, hiveConf, ipAddress); setSessionUGI(username); setDelegationToken(delegationToken); - - // create a new metastore connection for this particular user session - Hive.set(null); - try { - sessionHive = Hive.get(getHiveConf()); - } catch (HiveException e) { - throw new HiveSQLException("Failed to setup metastore connection", e); - } } // setup appropriate UGI for the session @@ -87,15 +77,6 @@ public class HiveSessionImplwithUGI extends HiveSessionImpl { return this.delegationTokenStr; } - @Override - protected synchronized void acquire(boolean userAccess) { - super.acquire(userAccess); - // if we have a metastore connection with impersonation, then set it first - if (sessionHive != null) { - Hive.set(sessionHive); - } - } - /** * Close the file systems for the session and remove it from the FileSystem cache. * Cancel the session's delegation token and close the metastore connection @@ -146,8 +127,6 @@ public class HiveSessionImplwithUGI extends HiveSessionImpl { } catch (HiveException e) { throw new HiveSQLException("Couldn't cancel delegation token", e); } - // close the metastore connection created with this delegation token - Hive.closeCurrent(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/3bf83292/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 5a0f1c8..dfb7faa 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -24,6 +24,7 @@ import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.security.auth.login.LoginException; @@ -67,6 +68,7 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe protected CLIService cliService; private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS); protected static HiveAuthFactory hiveAuthFactory; + private static final AtomicInteger sessionCount = new AtomicInteger(); protected int portNum; protected InetAddress serverIPAddress; @@ -304,6 +306,7 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe if (context != null) { context.setSessionHandle(sessionHandle); } + LOG.info("Opened a session, current sessions: " + sessionCount.incrementAndGet()); } catch (Exception e) { LOG.warn("Error opening session: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); @@ -446,6 +449,7 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe try { SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle()); cliService.closeSession(sessionHandle); + LOG.info("Closed a session, current sessions: " + sessionCount.decrementAndGet()); resp.setStatus(OK_STATUS); ThriftCLIServerContext context = (ThriftCLIServerContext)currentServerContext.get();