Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3AB7D185BB for ; Tue, 15 Dec 2015 21:29:36 +0000 (UTC) Received: (qmail 20353 invoked by uid 500); 15 Dec 2015 21:29:36 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 20306 invoked by uid 500); 15 Dec 2015 21:29:36 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 20295 invoked by uid 99); 15 Dec 2015 21:29:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Dec 2015 21:29:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EC6DEDFF87; Tue, 15 Dec 2015 21:29:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: szehon@apache.org To: commits@hive.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-12431 : Support timeout for compile lock (Mohit Sabharwal via Szehon) Date: Tue, 15 Dec 2015 21:29:35 +0000 (UTC) Repository: hive Updated Branches: refs/heads/master 25c207c93 -> e091bc271 HIVE-12431 : Support timeout for compile lock (Mohit Sabharwal via Szehon) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e091bc27 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e091bc27 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e091bc27 Branch: refs/heads/master Commit: e091bc27183dc0e24a554e599f1584249650306a Parents: 25c207c Author: Szehon Ho Authored: Tue Dec 15 13:27:22 2015 -0800 Committer: Szehon Ho Committed: Tue Dec 15 13:27:22 2015 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 5 +- .../java/org/apache/hadoop/hive/ql/Driver.java | 51 +++++++- .../org/apache/hadoop/hive/ql/ErrorMsg.java | 1 + .../apache/hive/service/cli/CLIServiceTest.java | 127 +++++++++++++++++-- 4 files changed, 172 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e091bc27/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 31f0634..243f281 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1849,7 +1849,10 @@ public class HiveConf extends Configuration { "Bind host on which to run the HiveServer2 Thrift service."), HIVE_SERVER2_PARALLEL_COMPILATION("hive.driver.parallel.compilation", false, "Whether to\n" + "enable parallel compilation between sessions on HiveServer2. The default is false."), - + HIVE_SERVER2_COMPILE_LOCK_TIMEOUT("hive.server2.compile.lock.timeout", "0s", + new TimeValidator(TimeUnit.SECONDS), + "Number of seconds a request will wait to acquire the compile lock before giving up. " + + "Setting it to 0s disables the timeout."), // HiveServer2 WebUI HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"), HIVE_SERVER2_WEBUI_PORT("hive.server2.webui.port", 10002, "The port the HiveServer2 WebUI will listen on"), http://git-wip-us.apache.org/repos/asf/hive/blob/e091bc27/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index f6af6ca..3d5f3b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; @@ -1128,9 +1129,12 @@ public class Driver implements CommandProcessor { private int compileInternal(String command) { boolean isParallelEnabled = SessionState.get().isHiveServerQuery() && this.isParallelEnabled; int ret; - final ReentrantLock compileLock = isParallelEnabled - ? SessionState.get().getCompileLock() : globalCompileLock; - compileLock.lock(); + final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled, + command); + if (compileLock == null) { + return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode(); + } + try { if (isParallelEnabled && LOG.isDebugEnabled()) { LOG.debug("Entering compile: " + command); @@ -1142,6 +1146,7 @@ public class Driver implements CommandProcessor { } finally { compileLock.unlock(); } + if (ret != 0) { try { releaseLocksAndCommitOrRollback(false, null); @@ -1153,6 +1158,46 @@ public class Driver implements CommandProcessor { return ret; } + /** + * Acquires the compile lock. If the compile lock wait timeout is configured, + * it will acquire the lock if it is not held by another thread within the given + * waiting time. + * @return the ReentrantLock object if the lock was successfully acquired, + * or {@code null} if compile lock wait timeout is configured and + * either the waiting time elapsed before the lock could be acquired + * or if the current thread is interrupted. + */ + private ReentrantLock tryAcquireCompileLock(boolean isParallelEnabled, + String command) { + final ReentrantLock compileLock = isParallelEnabled ? + SessionState.get().getCompileLock() : globalCompileLock; + long maxCompileLockWaitTime = HiveConf.getTimeVar( + this.conf, ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, + TimeUnit.SECONDS); + if (maxCompileLockWaitTime > 0) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting to acquire compile lock: " + command); + } + if(!compileLock.tryLock(maxCompileLockWaitTime, TimeUnit.SECONDS)) { + errorMessage = ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCodedMsg(); + LOG.error(errorMessage + ": " + command); + return null; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted Exception ignored", e); + } + return null; + } + } else { + compileLock.lock(); + } + + return compileLock; + } + private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled) throws CommandNeedRetryException { errorMessage = null; http://git-wip-us.apache.org/repos/asf/hive/blob/e091bc27/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 9d9dd53..d759739 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -426,6 +426,7 @@ public enum ErrorMsg { TBL_SORTED_NOT_BUCKETED(10306, "Destination table {0} found to be sorted but not bucketed.", true), //{2} should be lockid LOCK_ACQUIRE_TIMEDOUT(10307, "Lock acquisition for {0} timed out after {1}ms. {2}", true), + COMPILE_LOCK_TIMED_OUT(10308, "Attempt to acquire compile lock timed out.", true), //========================== 20000 range starts here ========================// SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " http://git-wip-us.apache.org/repos/asf/hive/blob/e091bc27/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java ---------------------------------------------------------------------- diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index 7bfbdb9..e78181a 100644 --- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -23,8 +23,10 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.Serializable; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -33,11 +35,16 @@ import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook; +import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hive.service.server.HiveServer2; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -303,15 +310,15 @@ public abstract class CLIServiceTest { // Create callables with different queries. String query = "SELECT ID + %1$d FROM " + tableName; cs[0] = createQueryCallable( - query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut); + query, confOverlay, longPollingTimeout, QUERY_COUNT, OperationState.FINISHED, true, cdlIn, cdlOut); query = "SELECT t1.ID, SUM(t2.ID) + %1$d FROM " + tableName + " t1 CROSS JOIN " + tableName + " t2 GROUP BY t1.ID HAVING t1.ID > 1"; cs[1] = createQueryCallable( - query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut); + query, confOverlay, longPollingTimeout, QUERY_COUNT, OperationState.FINISHED, true, cdlIn, cdlOut); query = "SELECT b.a FROM (SELECT (t1.ID + %1$d) as a , t2.* FROM " + tableName + " t1 INNER JOIN " + tableName + " t2 ON t1.ID = t2.ID WHERE t2.ID > 2) b"; cs[2] = createQueryCallable( - query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut); + query, confOverlay, longPollingTimeout, QUERY_COUNT, OperationState.FINISHED, true, cdlIn, cdlOut); @SuppressWarnings("unchecked") FutureTask[] tasks = new FutureTask[THREAD_COUNT]; @@ -334,13 +341,118 @@ public abstract class CLIServiceTest { client.closeSession(sessionHandle); } + public static class CompileLockTestSleepHook implements HiveSemanticAnalyzerHook { + @Override + public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, + ASTNode ast) throws SemanticException { + try { + Thread.sleep(20 * 1000); + } catch (Throwable t) { + // do nothing + } + return ast; + } + + @Override + public void postAnalyze(HiveSemanticAnalyzerHookContext context, + List> rootTasks) throws SemanticException { + } + } + + @Test + public void testGlobalCompileLockTimeout() throws Exception { + String tableName = "TEST_COMPILE_LOCK_TIMEOUT"; + String columnDefinitions = "(ID STRING)"; + + // Open a session and set up the test data + SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, + new HashMap()); + assertNotNull(sessionHandle); + + int THREAD_COUNT = 3; + @SuppressWarnings("unchecked") + FutureTask[] tasks = (FutureTask[])new FutureTask[THREAD_COUNT]; + long longPollingTimeoutMs = 10 * 60 * 1000; // Larger than max compile duration used in test + + // 1st query acquires the lock and takes 20 secs to compile + Map confOverlay = getConfOverlay(0, longPollingTimeoutMs); + confOverlay.put(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, + CompileLockTestSleepHook.class.getName()); + String query = "SELECT 0 FROM " + tableName; + tasks[0] = new FutureTask( + createQueryCallable(query, confOverlay, longPollingTimeoutMs, 1, + OperationState.FINISHED, false, null, null)); + new Thread(tasks[0]).start(); + Thread.sleep(5 * 1000); + + // 2nd query's session has compile lock timeout of 1 sec, so it should + // not be able to acquire the lock within that time period + confOverlay = getConfOverlay(1, longPollingTimeoutMs); + query = "SELECT 1 FROM " + tableName; + tasks[1] = new FutureTask( + createQueryCallable(query, confOverlay, longPollingTimeoutMs, 1, + OperationState.ERROR, false, null, null)); + new Thread(tasks[1]).start(); + + // 3rd query's session has compile lock timeout of 100 secs, so it should + // be able to acquire the lock and finish successfully + confOverlay = getConfOverlay(100, longPollingTimeoutMs); + query = "SELECT 2 FROM " + tableName; + tasks[2] = new FutureTask( + createQueryCallable(query, confOverlay, longPollingTimeoutMs, 1, + OperationState.FINISHED, false, null, null)); + new Thread(tasks[2]).start(); + + boolean foundExpectedException = false; + for (int i = 0; i < THREAD_COUNT; ++i) { + try { + tasks[i].get(); + } catch (Throwable t) { + if (i == 1) { + assertTrue(t.getMessage().contains( + ErrorMsg.COMPILE_LOCK_TIMED_OUT.getMsg())); + foundExpectedException = true; + } else { + throw new RuntimeException(t); + } + } + } + assertTrue(foundExpectedException); + + // Cleanup + client.executeStatement(sessionHandle, "DROP TABLE " + tableName, + getConfOverlay(0, longPollingTimeoutMs)); + client.closeSession(sessionHandle); + } + + private Map getConfOverlay(long compileLockTimeoutSecs, + long longPollingTimeoutMs) { + Map confOverlay = new HashMap(); + confOverlay.put( + HiveConf.ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION.varname, "false"); + confOverlay.put( + HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, + longPollingTimeoutMs + "ms"); + if (compileLockTimeoutSecs > 0) { + confOverlay.put( + HiveConf.ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT.varname, + compileLockTimeoutSecs + "s"); + } + return confOverlay; + } + private Callable createQueryCallable(final String queryStringFormat, final Map confOverlay, final long longPollingTimeout, - final int queryCount, final CountDownLatch cdlIn, final CountDownLatch cdlOut) { + final int queryCount, final OperationState expectedOperationState, + final boolean syncThreadStart, final CountDownLatch cdlIn, + final CountDownLatch cdlOut) { return new Callable() { @Override public Void call() throws Exception { - syncThreadStart(cdlIn, cdlOut); + if (syncThreadStart) { + syncThreadStart(cdlIn, cdlOut); + } + SessionHandle sessionHandle = openSession(confOverlay); OperationHandle[] hs = new OperationHandle[queryCount]; for (int i = 0; i < hs.length; ++i) { @@ -349,7 +461,7 @@ public abstract class CLIServiceTest { hs[i] = client.executeStatementAsync(sessionHandle, queryString, confOverlay); } for (int i = hs.length - 1; i >= 0; --i) { - waitForAsyncQuery(hs[i], OperationState.FINISHED, longPollingTimeout); + waitForAsyncQuery(hs[i], expectedOperationState, longPollingTimeout); } return null; } @@ -405,7 +517,6 @@ public abstract class CLIServiceTest { return waitForAsyncQuery(h, expectedState, longPollingTimeout); } - private OperationStatus waitForAsyncQuery(OperationHandle opHandle, OperationState expectedState, long longPollingTimeout) throws HiveSQLException { long testIterationTimeout = System.currentTimeMillis() + 100000;