Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AE7B9200B44 for ; Thu, 14 Jul 2016 23:47:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id ABDC8160A63; Thu, 14 Jul 2016 21:47:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1D66F160A60 for ; Thu, 14 Jul 2016 23:47:12 +0200 (CEST) Received: (qmail 82940 invoked by uid 500); 14 Jul 2016 21:47:12 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 82929 invoked by uid 99); 14 Jul 2016 21:47:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Jul 2016 21:47:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 21BF3E383A; Thu, 14 Jul 2016 21:47:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: thejas@apache.org To: commits@hive.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-11402 : HS2 - add an option to disallow parallel query execution within a single Session (Sergey Shelukhin, reviewed by Aihua Xu, Thejas Nair) Date: Thu, 14 Jul 2016 21:47:12 +0000 (UTC) archived-at: Thu, 14 Jul 2016 21:47:14 -0000 Repository: hive Updated Branches: refs/heads/branch-2.1 079c721ce -> 48f329701 HIVE-11402 : HS2 - add an option to disallow parallel query execution within a single Session (Sergey Shelukhin, reviewed by Aihua Xu, Thejas Nair) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/48f32970 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/48f32970 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/48f32970 Branch: refs/heads/branch-2.1 Commit: 48f329701ec7d2eab890d365d29d8d33f6b63314 Parents: 079c721 Author: Sergey Shelukhin Authored: Thu Jul 14 14:13:39 2016 -0700 Committer: Thejas Nair Committed: Thu Jul 14 14:33:02 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../org/apache/hive/jdbc/TestJdbcDriver2.java | 31 ++++ .../cli/session/TestHiveSessionImpl.java | 4 +- .../operation/ExecuteStatementOperation.java | 2 +- .../hive/service/cli/operation/Operation.java | 20 ++- .../service/cli/operation/SQLOperation.java | 145 +++++++++++-------- .../hive/service/cli/session/HiveSession.java | 3 + .../service/cli/session/HiveSessionImpl.java | 138 ++++++++++++------ .../cli/session/HiveSessionImplwithUGI.java | 3 +- 9 files changed, 228 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/48f32970/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index da6cecb..68f3e33 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2148,6 +2148,8 @@ public class HiveConf extends Configuration { new TimeValidator(TimeUnit.SECONDS), "Number of seconds a request will wait to acquire the compile lock before giving up. " + "Setting it to 0s disables the timeout."), + HIVE_SERVER2_PARALLEL_OPS_IN_SESSION("hive.server2.parallel.ops.in.session", true, + "Whether to allow several parallel operations (such as SQL statements) in one session."), // HiveServer2 WebUI HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"), http://git-wip-us.apache.org/repos/asf/hive/blob/48f32970/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 7243648..21028f4 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -135,6 +135,7 @@ public class TestJdbcDriver2 { System.setProperty(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL.varname, "verbose"); System.setProperty(ConfVars.HIVEMAPREDMODE.varname, "nonstrict"); System.setProperty(ConfVars.HIVE_AUTHORIZATION_MANAGER.varname, "org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider"); + System.setProperty(ConfVars.HIVE_SERVER2_PARALLEL_OPS_IN_SESSION.varname, "false"); Statement stmt1 = con1.createStatement(); assertNotNull("Statement is null", stmt1); @@ -328,6 +329,23 @@ public class TestJdbcDriver2 { } @Test + public void testSerializedExecution() throws Exception { + // Test running parallel queries (with parallel queries disabled). + // Should be serialized in the order of execution. + HiveStatement stmt1 = (HiveStatement) con.createStatement(); + HiveStatement stmt2 = (HiveStatement) con.createStatement(); + stmt1.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'"); + stmt1.execute("create table test_ser_1(i int)"); + stmt1.executeAsync("insert into test_ser_1 select sleepMsUDF(under_col, 500) from " + + tableName + " limit 1"); + boolean isResultSet = stmt2.executeAsync("select * from test_ser_1"); + assertTrue(isResultSet); + ResultSet rs = stmt2.getResultSet(); + assertTrue(rs.next()); + assertFalse(rs.next()); + } + + @Test public void testParentReferences() throws Exception { /* Test parent references from Statement */ Statement s = this.con.createStatement(); @@ -2534,6 +2552,19 @@ public void testParseUrlHttpMode() throws SQLException, JdbcUriParseException, } } + + // A udf which sleeps for some number of ms to simulate a long running query + public static class SleepMsUDF extends UDF { + public Integer evaluate(final Integer value, final Integer ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + // No-op + } + return value; + } + } + /** * Loads data from a table containing non-ascii value column * Runs a query and compares the return value http://git-wip-us.apache.org/repos/asf/hive/blob/48f32970/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java index d58a913..c7fa5da 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java @@ -51,11 +51,11 @@ public class TestHiveSessionImpl { HiveSessionImpl session = new HiveSessionImpl(protocol, username, password, serverhiveConf, ipAddress) { @Override - protected synchronized void acquire(boolean userAccess) { + protected synchronized void acquire(boolean userAccess, boolean isOperation) { } @Override - protected synchronized void release(boolean userAccess) { + protected synchronized void release(boolean userAccess, boolean isOperation) { } }; http://git-wip-us.apache.org/repos/asf/hive/blob/48f32970/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java index ff46ed8..2dd90b6 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java @@ -31,7 +31,7 @@ public abstract class ExecuteStatementOperation extends Operation { public ExecuteStatementOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runInBackground) { - super(parentSession, confOverlay, OperationType.EXECUTE_STATEMENT, runInBackground); + super(parentSession, confOverlay, OperationType.EXECUTE_STATEMENT); this.statement = statement; } http://git-wip-us.apache.org/repos/asf/hive/blob/48f32970/service/src/java/org/apache/hive/service/cli/operation/Operation.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index c3be295..023f6b2 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -67,7 +67,6 @@ public abstract class Operation { public static final long DEFAULT_FETCH_MAX_ROWS = 100; protected boolean hasResultSet; protected volatile HiveSQLException operationException; - protected final boolean runAsync; protected volatile Future backgroundHandle; protected OperationLog operationLog; protected boolean isOperationLogEnabled; @@ -85,23 +84,29 @@ public abstract class Operation { protected static final EnumSet DEFAULT_FETCH_ORIENTATION_SET = EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST); + protected Operation(HiveSession parentSession, OperationType opType) { - this(parentSession, null, opType, false); - } + this(parentSession, null, opType); + } + + protected Operation(HiveSession parentSession, Map confOverlay, + OperationType opType) { + this(parentSession, confOverlay, opType, false); + } - protected Operation(HiveSession parentSession, Map confOverlay, OperationType opType, boolean runInBackground) { + protected Operation(HiveSession parentSession, + Map confOverlay, OperationType opType, boolean isAsyncQueryState) { this.parentSession = parentSession; if (confOverlay != null) { this.confOverlay = confOverlay; } - this.runAsync = runInBackground; this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion()); beginTime = System.currentTimeMillis(); lastAccessTime = beginTime; operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(), HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS); setMetrics(state); - queryState = new QueryState(parentSession.getHiveConf(), confOverlay, runAsync); + queryState = new QueryState(parentSession.getHiveConf(), confOverlay, isAsyncQueryState); } public Future getBackgroundHandle() { @@ -113,10 +118,9 @@ public abstract class Operation { } public boolean shouldRunAsync() { - return runAsync; + return false; // Most operations cannot run asynchronously. } - public HiveSession getParentSession() { return parentSession; } http://git-wip-us.apache.org/repos/asf/hive/blob/48f32970/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 57c954a..3df831c 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -89,7 +89,6 @@ import org.codehaus.jackson.map.ObjectMapper; */ @SuppressWarnings("deprecation") public class SQLOperation extends ExecuteStatementOperation { - private Driver driver = null; private CommandProcessorResponse response; private TableSchema resultSchema = null; @@ -101,6 +100,7 @@ public class SQLOperation extends ExecuteStatementOperation { private SQLOperationDisplay sqlOpDisplay; private long queryTimeout; private ScheduledExecutorService timeoutExecutor; + private final boolean runAsync; /** * A map to track query count running by each user @@ -112,6 +112,7 @@ public class SQLOperation extends ExecuteStatementOperation { boolean runInBackground, long queryTimeout) { // TODO: call setRemoteUser in ExecuteStatementOperation or higher. super(parentSession, statement, confOverlay, runInBackground); + this.runAsync = runInBackground; this.queryTimeout = queryTimeout; setupSessionIO(parentSession.getSessionState()); try { @@ -121,6 +122,11 @@ public class SQLOperation extends ExecuteStatementOperation { } } + @Override + public boolean shouldRunAsync() { + return runAsync; + } + private void setupSessionIO(SessionState sessionState) { try { sessionState.in = null; // hive server's session input stream is not used @@ -272,70 +278,16 @@ public class SQLOperation extends ExecuteStatementOperation { if (!runAsync) { runQuery(); } else { - // We'll pass ThreadLocals in the background thread from the foreground (handler) thread - final SessionState parentSessionState = SessionState.get(); - // ThreadLocal Hive object needs to be set in background thread. - // The metastore client in Hive is associated with right user. - final Hive parentHive = parentSession.getSessionHive(); - final PerfLogger parentPerfLogger = SessionState.getPerfLogger(); - // Current UGI will get used by metastore when metsatore is in embedded mode - // So this needs to get passed to the new background thread - final UserGroupInformation currentUGI = getCurrentUGI(); - // Runnable impl to call runInternal asynchronously, - // from a different thread - Runnable backgroundOperation = new Runnable() { - @Override - public void run() { - PrivilegedExceptionAction doAsAction = new PrivilegedExceptionAction() { - @Override - public Object run() throws HiveSQLException { - Hive.set(parentHive); - // TODO: can this result in cross-thread reuse of session state? - SessionState.setCurrentSessionState(parentSessionState); - PerfLogger.setPerfLogger(parentPerfLogger); - // Set current OperationLog in this async thread for keeping on saving query log. - registerCurrentOperationLog(); - registerLoggingContext(); - try { - if (asyncPrepare) { - prepare(queryState); - } - runQuery(); - } catch (HiveSQLException e) { - setOperationException(e); - LOG.error("Error running hive query: ", e); - } finally { - unregisterLoggingContext(); - unregisterOperationLog(); - } - return null; - } - }; + // We'll pass ThreadLocals in the background thread from the foreground (handler) thread. + // 1) ThreadLocal Hive object needs to be set in background thread + // 2) The metastore client in Hive is associated with right user. + // 3) Current UGI will get used by metastore when metastore is in embedded mode + Runnable work = new BackgroundWork(getCurrentUGI(), parentSession.getSessionHive(), + SessionState.getPerfLogger(), SessionState.get(), asyncPrepare); - try { - currentUGI.doAs(doAsAction); - } catch (Exception e) { - setOperationException(new HiveSQLException(e)); - LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e); - } - finally { - /** - * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup - * when this thread is garbage collected later. - * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() - */ - if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { - ThreadWithGarbageCleanup currentThread = - (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); - currentThread.cacheThreadLocalRawStore(); - } - } - } - }; try { // This submit blocks if no background threads are available to run this operation - Future backgroundHandle = - getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation); + Future backgroundHandle = getParentSession().submitBackgroundOperation(work); setBackgroundHandle(backgroundHandle); } catch (RejectedExecutionException rejected) { setState(OperationState.ERROR); @@ -345,6 +297,74 @@ public class SQLOperation extends ExecuteStatementOperation { } } + + private final class BackgroundWork implements Runnable { + private final UserGroupInformation currentUGI; + private final Hive parentHive; + private final PerfLogger parentPerfLogger; + private final SessionState parentSessionState; + private final boolean asyncPrepare; + + private BackgroundWork(UserGroupInformation currentUGI, + Hive parentHive, PerfLogger parentPerfLogger, + SessionState parentSessionState, boolean asyncPrepare) { + this.currentUGI = currentUGI; + this.parentHive = parentHive; + this.parentPerfLogger = parentPerfLogger; + this.parentSessionState = parentSessionState; + this.asyncPrepare = asyncPrepare; + } + + @Override + public void run() { + PrivilegedExceptionAction doAsAction = new PrivilegedExceptionAction() { + @Override + public Object run() throws HiveSQLException { + Hive.set(parentHive); + // TODO: can this result in cross-thread reuse of session state? + SessionState.setCurrentSessionState(parentSessionState); + PerfLogger.setPerfLogger(parentPerfLogger); + // Set current OperationLog in this async thread for keeping on saving query log. + registerCurrentOperationLog(); + registerLoggingContext(); + try { + if (asyncPrepare) { + prepare(queryState); + } + runQuery(); + } catch (HiveSQLException e) { + setOperationException(e); + LOG.error("Error running hive query: ", e); + } finally { + unregisterLoggingContext(); + unregisterOperationLog(); + } + return null; + } + }; + + try { + currentUGI.doAs(doAsAction); + } catch (Exception e) { + setOperationException(new HiveSQLException(e)); + LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e); + } + finally { + /** + * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup + * when this thread is garbage collected later. + * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() + */ + if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { + ThreadWithGarbageCleanup currentThread = + (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); + currentThread.cacheThreadLocalRawStore(); + } + } + } + } + + /** * Returns the current UGI on the stack * @param opConfig @@ -663,4 +683,5 @@ public class SQLOperation extends ExecuteStatementOperation { public String getExecutionEngine() { return queryState.getConf().getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); } + } http://git-wip-us.apache.org/repos/asf/hive/blob/48f32970/service/src/java/org/apache/hive/service/cli/session/HiveSession.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java index 78ff388..e5d865b 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -20,6 +20,7 @@ package org.apache.hive.service.cli.session; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -211,4 +212,6 @@ public interface HiveSession extends HiveSessionBase { void closeExpiredOperations(); long getNoOperationTime(); + + Future submitBackgroundOperation(Runnable work); } http://git-wip-us.apache.org/repos/asf/hive/blob/48f32970/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 54ee567..01b0034 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -24,11 +24,13 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.Semaphore; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; @@ -38,8 +40,6 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.ql.QueryPlan; -import org.apache.hadoop.hive.ql.exec.ListSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -113,16 +113,18 @@ public class HiveSessionImpl implements HiveSession { private volatile long lastAccessTime; private volatile long lastIdleTime; + private final Semaphore operationLock; - - public HiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol, String username, String password, - HiveConf serverhiveConf, String ipAddress) { + public HiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol, + String username, String password, HiveConf serverConf, String ipAddress) { this.username = username; this.password = password; creationTime = System.currentTimeMillis(); this.sessionHandle = sessionHandle != null ? sessionHandle : new SessionHandle(protocol); - this.sessionConf = new HiveConf(serverhiveConf); + this.sessionConf = new HiveConf(serverConf); this.ipAddress = ipAddress; + this.operationLock = serverConf.getBoolVar( + ConfVars.HIVE_SERVER2_PARALLEL_OPS_IN_SESSION) ? null : new Semaphore(1); try { // In non-impersonation mode, map scheduler queue to current user // if fair scheduler is configured. @@ -325,7 +327,27 @@ public class HiveSessionImpl implements HiveSession { this.operationManager = operationManager; } - protected synchronized void acquire(boolean userAccess) { + protected void acquire(boolean userAccess, boolean isOperation) { + if (isOperation && operationLock != null) { + try { + operationLock.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + boolean success = false; + try { + acquireAfterOpLock(userAccess); + success = true; + } finally { + if (!success && isOperation && operationLock != null) { + operationLock.release(); + } + } + } + + private synchronized void acquireAfterOpLock(boolean userAccess) { // Need to make sure that the this HiveServer2's session's SessionState is // stored in the thread local for the handler thread. SessionState.setCurrentSessionState(sessionState); @@ -345,7 +367,17 @@ public class HiveSessionImpl implements HiveSession { * when this thread is garbage collected later. * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() */ - protected synchronized void release(boolean userAccess) { + protected void release(boolean userAccess, boolean isOperation) { + try { + releaseBeforeOpLock(userAccess); + } finally { + if (isOperation && operationLock != null) { + operationLock.release(); + } + } + } + + private synchronized void releaseBeforeOpLock(boolean userAccess) { if (sessionState != null) { // can be null in-case of junit tests. skip reset. // reset thread name at release time. @@ -401,7 +433,7 @@ public class HiveSessionImpl implements HiveSession { @Override public GetInfoValue getInfo(GetInfoType getInfoType) throws HiveSQLException { - acquire(true); + acquire(true, true); try { switch (getInfoType) { case CLI_SERVER_NAME: @@ -421,7 +453,7 @@ public class HiveSessionImpl implements HiveSession { throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString()); } } finally { - release(true); + release(true, true); } } @@ -449,12 +481,11 @@ public class HiveSessionImpl implements HiveSession { private OperationHandle executeStatementInternal(String statement, Map confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException { - acquire(true); + acquire(true, true); OperationManager operationManager = getOperationManager(); - ExecuteStatementOperation operation = - operationManager.newExecuteStatementOperation(getSession(), statement, confOverlay, - runAsync, queryTimeout); + ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation( + getSession(), statement, confOverlay, runAsync, queryTimeout); OperationHandle opHandle = operation.getHandle(); try { operation.run(); @@ -467,14 +498,29 @@ public class HiveSessionImpl implements HiveSession { operationManager.closeOperation(opHandle); throw e; } finally { - release(true); + if (operation.getBackgroundHandle() == null) { + release(true, true); // Not async, or wasn't submitted for some reason (failure, etc.) + } else { + releaseBeforeOpLock(true); // Release, but keep the lock (if present). + } } } @Override + public Future submitBackgroundOperation(Runnable work) { + return getSessionManager().submitBackgroundOperation( + operationLock == null ? work : new FutureTask(work, null) { + protected void done() { + // We assume this always comes from a user operation that took the lock. + operationLock.release(); + }; + }); + } + + @Override public OperationHandle getTypeInfo() throws HiveSQLException { - acquire(true); + acquire(true, true); OperationManager operationManager = getOperationManager(); GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession()); @@ -487,14 +533,14 @@ public class HiveSessionImpl implements HiveSession { operationManager.closeOperation(opHandle); throw e; } finally { - release(true); + release(true, true); } } @Override public OperationHandle getCatalogs() throws HiveSQLException { - acquire(true); + acquire(true, true); OperationManager operationManager = getOperationManager(); GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession()); @@ -507,14 +553,14 @@ public class HiveSessionImpl implements HiveSession { operationManager.closeOperation(opHandle); throw e; } finally { - release(true); + release(true, true); } } @Override public OperationHandle getSchemas(String catalogName, String schemaName) throws HiveSQLException { - acquire(true); + acquire(true, true); OperationManager operationManager = getOperationManager(); GetSchemasOperation operation = @@ -528,7 +574,7 @@ public class HiveSessionImpl implements HiveSession { operationManager.closeOperation(opHandle); throw e; } finally { - release(true); + release(true, true); } } @@ -536,7 +582,7 @@ public class HiveSessionImpl implements HiveSession { public OperationHandle getTables(String catalogName, String schemaName, String tableName, List tableTypes) throws HiveSQLException { - acquire(true); + acquire(true, true); OperationManager operationManager = getOperationManager(); MetadataOperation operation = @@ -550,14 +596,14 @@ public class HiveSessionImpl implements HiveSession { operationManager.closeOperation(opHandle); throw e; } finally { - release(true); + release(true, true); } } @Override public OperationHandle getTableTypes() throws HiveSQLException { - acquire(true); + acquire(true, true); OperationManager operationManager = getOperationManager(); GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession()); @@ -570,14 +616,14 @@ public class HiveSessionImpl implements HiveSession { operationManager.closeOperation(opHandle); throw e; } finally { - release(true); + release(true, true); } } @Override public OperationHandle getColumns(String catalogName, String schemaName, String tableName, String columnName) throws HiveSQLException { - acquire(true); + acquire(true, true); String addedJars = Utilities.getResourceFiles(sessionConf, SessionState.ResourceType.JAR); if (StringUtils.isNotBlank(addedJars)) { IMetaStoreClient metastoreClient = getSession().getMetaStoreClient(); @@ -595,7 +641,7 @@ public class HiveSessionImpl implements HiveSession { operationManager.closeOperation(opHandle); throw e; } finally { - release(true); + release(true, true); } } @@ -608,7 +654,7 @@ public class HiveSessionImpl implements HiveSession { @Override public OperationHandle getFunctions(String catalogName, String schemaName, String functionName) throws HiveSQLException { - acquire(true); + acquire(true, true); OperationManager operationManager = getOperationManager(); GetFunctionsOperation operation = operationManager @@ -622,14 +668,14 @@ public class HiveSessionImpl implements HiveSession { operationManager.closeOperation(opHandle); throw e; } finally { - release(true); + release(true, true); } } @Override public void close() throws HiveSQLException { try { - acquire(true); + acquire(true, false); // Iterate through the opHandles and close their operations List ops = null; synchronized (opHandleSet) { @@ -671,7 +717,7 @@ public class HiveSessionImpl implements HiveSession { } sessionHive = null; } - release(true); + release(true, false); } } @@ -732,7 +778,7 @@ public class HiveSessionImpl implements HiveSession { } private void closeTimedOutOperations(List operations) { - acquire(false); + acquire(false, false); try { for (Operation operation : operations) { synchronized (opHandleSet) { @@ -745,54 +791,54 @@ public class HiveSessionImpl implements HiveSession { } } } finally { - release(false); + release(false, false); } } @Override public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { - acquire(true); + acquire(true, false); try { sessionManager.getOperationManager().cancelOperation(opHandle); } finally { - release(true); + release(true, false); } } @Override public void closeOperation(OperationHandle opHandle) throws HiveSQLException { - acquire(true); + acquire(true, false); try { operationManager.closeOperation(opHandle); synchronized (opHandleSet) { opHandleSet.remove(opHandle); } } finally { - release(true); + release(true, false); } } @Override public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException { - acquire(true); + acquire(true, true); try { return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle); } finally { - release(true); + release(true, true); } } @Override public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, FetchType fetchType) throws HiveSQLException { - acquire(true); + acquire(true, false); try { if (fetchType == FetchType.QUERY_OUTPUT) { return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows); } return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows, sessionConf); } finally { - release(true); + release(true, false); } } @@ -846,7 +892,7 @@ public class HiveSessionImpl implements HiveSession { @Override public OperationHandle getPrimaryKeys(String catalog, String schema, String table) throws HiveSQLException { - acquire(true); + acquire(true, true); OperationManager operationManager = getOperationManager(); GetPrimaryKeysOperation operation = operationManager @@ -860,7 +906,7 @@ public class HiveSessionImpl implements HiveSession { operationManager.closeOperation(opHandle); throw e; } finally { - release(true); + release(true, true); } } @@ -868,7 +914,7 @@ public class HiveSessionImpl implements HiveSession { public OperationHandle getCrossReference(String primaryCatalog, String primarySchema, String primaryTable, String foreignCatalog, String foreignSchema, String foreignTable) throws HiveSQLException { - acquire(true); + acquire(true, true); OperationManager operationManager = getOperationManager(); GetCrossReferenceOperation operation = operationManager @@ -884,7 +930,7 @@ public class HiveSessionImpl implements HiveSession { operationManager.closeOperation(opHandle); throw e; } finally { - release(true); + release(true, true); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/48f32970/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java index f7b3412..afed9e2 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java @@ -89,9 +89,10 @@ public class HiveSessionImplwithUGI extends HiveSessionImpl { @Override public void close() throws HiveSQLException { try { - acquire(true); + acquire(true, false); cancelDelegationToken(); } finally { + release(true, false); try { super.close(); } finally {