Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8E1DF200C10 for ; Fri, 20 Jan 2017 03:26:46 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8CA9E160B57; Fri, 20 Jan 2017 02:26:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 39554160B54 for ; Fri, 20 Jan 2017 03:26:45 +0100 (CET) Received: (qmail 99721 invoked by uid 500); 20 Jan 2017 02:26:44 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 99710 invoked by uid 99); 20 Jan 2017 02:26:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jan 2017 02:26:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0FC71DFC70; Fri, 20 Jan 2017 02:26:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ychena@apache.org To: commits@hive.apache.org Message-Id: <7a13aeb5f10745ab86e5adb2d10dfdcf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-15572: Improve the response time for query canceling when it happens during acquiring locks (Yongzhi Chen, reviewed by Chaoyu Tang) Date: Fri, 20 Jan 2017 02:26:44 +0000 (UTC) archived-at: Fri, 20 Jan 2017 02:26:46 -0000 Repository: hive Updated Branches: refs/heads/master 92090823d -> dbc2dffcd HIVE-15572: Improve the response time for query canceling when it happens during acquiring locks (Yongzhi Chen, reviewed by Chaoyu Tang) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dbc2dffc Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dbc2dffc Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dbc2dffc Branch: refs/heads/master Commit: dbc2dffcd212a023ed43a932a716a177e5b466ef Parents: 9209082 Author: Yongzhi Chen Authored: Tue Jan 10 21:48:34 2017 -0500 Committer: Yongzhi Chen Committed: Thu Jan 19 21:25:52 2017 -0500 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hive/ql/Driver.java | 113 ++++++++++--------- .../org/apache/hadoop/hive/ql/ErrorMsg.java | 1 + .../hadoop/hive/ql/lockmgr/DbLockManager.java | 3 +- .../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 9 +- .../hive/ql/lockmgr/EmbeddedLockManager.java | 3 +- .../hadoop/hive/ql/lockmgr/HiveLockManager.java | 3 +- .../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 15 +++ .../hive/ql/lockmgr/HiveTxnManagerImpl.java | 8 ++ .../zookeeper/ZooKeeperHiveLockManager.java | 29 +++-- .../hive/ql/lockmgr/TestDummyTxnManager.java | 24 +++- 10 files changed, 138 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index fd6020b..efa2bdc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -172,6 +172,7 @@ public class Driver implements CommandProcessor { // For WebUI. Kept alive after queryPlan is freed. private final QueryDisplay queryDisplay = new QueryDisplay(); + private LockedDriverState lDrvState = new LockedDriverState(); // Query specific info private QueryState queryState; @@ -179,12 +180,7 @@ public class Driver implements CommandProcessor { // Query hooks that execute before compilation and after execution List queryHooks; - // a lock is used for synchronizing the state transition and its associated - // resource releases - private final ReentrantLock stateLock = new ReentrantLock(); - private DriverState driverState = DriverState.INITIALIZED; - - private enum DriverState { + public enum DriverState { INITIALIZED, COMPILING, COMPILED, @@ -201,6 +197,13 @@ public class Driver implements CommandProcessor { ERROR } + public static class LockedDriverState { + // a lock is used for synchronizing the state transition and its associated + // resource releases + public final ReentrantLock stateLock = new ReentrantLock(); + public DriverState driverState = DriverState.INITIALIZED; + } + private boolean checkConcurrency() { boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); if (!supportConcurrency) { @@ -381,11 +384,11 @@ public class Driver implements CommandProcessor { PerfLogger perfLogger = SessionState.getPerfLogger(true); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE); - stateLock.lock(); + lDrvState.stateLock.lock(); try { - driverState = DriverState.COMPILING; + lDrvState.driverState = DriverState.COMPILING; } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } command = new VariableSubstitution(new HiveVariableSource() { @@ -623,15 +626,15 @@ public class Driver implements CommandProcessor { if (isInterrupted && !deferClose) { closeInProcess(true); } - stateLock.lock(); + lDrvState.stateLock.lock(); try { if (isInterrupted) { - driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR; + lDrvState.driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR; } else { - driverState = compileError ? DriverState.ERROR : DriverState.COMPILED; + lDrvState.driverState = compileError ? DriverState.ERROR : DriverState.COMPILED; } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } if (isInterrupted) { @@ -650,16 +653,16 @@ public class Driver implements CommandProcessor { } private boolean isInterrupted() { - stateLock.lock(); + lDrvState.stateLock.lock(); try { - if (driverState == DriverState.INTERRUPT) { + if (lDrvState.driverState == DriverState.INTERRUPT) { Thread.currentThread().interrupt(); return true; } else { return false; } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } } @@ -1123,7 +1126,7 @@ public class Driver implements CommandProcessor { see the changes made by 1st one. This takes care of autoCommit=true case. For multi-stmt txns this is not sufficient and will be managed via WriteSet tracking in the lock manager.*/ - txnMgr.acquireLocks(plan, ctx, userFromUGI); + txnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); if(initiatingTransaction || (readOnlyQueryInAutoCommit && acidInQuery)) { //For multi-stmt txns we should record the snapshot when txn starts but // don't update it after that until txn completes. Thus the check for {@code initiatingTransaction} @@ -1394,21 +1397,21 @@ public class Driver implements CommandProcessor { errorMessage = null; SQLState = null; downstreamError = null; - stateLock.lock(); + lDrvState.stateLock.lock(); try { if (alreadyCompiled) { - if (driverState == DriverState.COMPILED) { - driverState = DriverState.EXECUTING; + if (lDrvState.driverState == DriverState.COMPILED) { + lDrvState.driverState = DriverState.EXECUTING; } else { errorMessage = "FAILED: Precompiled query has been cancelled or closed."; console.printError(errorMessage); return createProcessorResponse(12); } } else { - driverState = DriverState.COMPILING; + lDrvState.driverState = DriverState.COMPILING; } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } // a flag that helps to set the correct driver state in finally block by tracking if @@ -1555,15 +1558,15 @@ public class Driver implements CommandProcessor { // only release the related resources ctx, driverContext as normal releaseResources(); } - stateLock.lock(); + lDrvState.stateLock.lock(); try { - if (driverState == DriverState.INTERRUPT) { - driverState = DriverState.ERROR; + if (lDrvState.driverState == DriverState.INTERRUPT) { + lDrvState.driverState = DriverState.ERROR; } else { - driverState = isFinishedWithError ? DriverState.ERROR : DriverState.EXECUTED; + lDrvState.driverState = isFinishedWithError ? DriverState.ERROR : DriverState.EXECUTED; } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } } } @@ -1690,22 +1693,22 @@ public class Driver implements CommandProcessor { // hide sensitive information during query redaction. String queryStr = conf.getQueryString(); - stateLock.lock(); + lDrvState.stateLock.lock(); try { // if query is not in compiled state, or executing state which is carried over from // a combined compile/execute in runInternal, throws the error - if (driverState != DriverState.COMPILED && - driverState != DriverState.EXECUTING) { + if (lDrvState.driverState != DriverState.COMPILED && + lDrvState.driverState != DriverState.EXECUTING) { SQLState = "HY008"; errorMessage = "FAILED: query " + queryStr + " has " + - (driverState == DriverState.INTERRUPT ? "been cancelled" : "not been compiled."); + (lDrvState.driverState == DriverState.INTERRUPT ? "been cancelled" : "not been compiled."); console.printError(errorMessage); return 1000; } else { - driverState = DriverState.EXECUTING; + lDrvState.driverState = DriverState.EXECUTING; } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER); @@ -2017,17 +2020,17 @@ public class Driver implements CommandProcessor { if (isInterrupted && !deferClose) { closeInProcess(true); } - stateLock.lock(); + lDrvState.stateLock.lock(); try { if (isInterrupted) { if (!deferClose) { - driverState = DriverState.ERROR; + lDrvState.driverState = DriverState.ERROR; } } else { - driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED; + lDrvState.driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED; } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } if (isInterrupted) { LOG.info("Executing command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds"); @@ -2045,7 +2048,7 @@ public class Driver implements CommandProcessor { private void releasePlan(QueryPlan plan) { // Plan maybe null if Driver.close is called in another thread for the same Driver object - stateLock.lock(); + lDrvState.stateLock.lock(); try { if (plan != null) { plan.setDone(); @@ -2059,7 +2062,7 @@ public class Driver implements CommandProcessor { } } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } } @@ -2176,7 +2179,7 @@ public class Driver implements CommandProcessor { @SuppressWarnings("unchecked") public boolean getResults(List res) throws IOException, CommandNeedRetryException { - if (driverState == DriverState.DESTROYED || driverState == DriverState.CLOSED) { + if (lDrvState.driverState == DriverState.DESTROYED || lDrvState.driverState == DriverState.CLOSED) { throw new IOException("FAILED: query has been cancelled, closed, or destroyed."); } @@ -2240,7 +2243,7 @@ public class Driver implements CommandProcessor { } public void resetFetch() throws IOException { - if (driverState == DriverState.DESTROYED || driverState == DriverState.CLOSED) { + if (lDrvState.driverState == DriverState.DESTROYED || lDrvState.driverState == DriverState.CLOSED) { throw new IOException("FAILED: driver has been cancelled, closed or destroyed."); } if (isFetchingTable()) { @@ -2268,7 +2271,7 @@ public class Driver implements CommandProcessor { // DriverContext could be released in the query and close processes at same // time, which needs to be thread protected. private void releaseDriverContext() { - stateLock.lock(); + lDrvState.stateLock.lock(); try { if (driverCxt != null) { driverCxt.shutdown(); @@ -2277,7 +2280,7 @@ public class Driver implements CommandProcessor { } catch (Exception e) { LOG.debug("Exception while shutting down the task runner", e); } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } } @@ -2360,22 +2363,22 @@ public class Driver implements CommandProcessor { // is called to stop the query if it is running, clean query results, and release resources. public int close() { - stateLock.lock(); + lDrvState.stateLock.lock(); try { releaseDriverContext(); - if (driverState == DriverState.COMPILING || - driverState == DriverState.EXECUTING || - driverState == DriverState.INTERRUPT) { - driverState = DriverState.INTERRUPT; + if (lDrvState.driverState == DriverState.COMPILING || + lDrvState.driverState == DriverState.EXECUTING || + lDrvState.driverState == DriverState.INTERRUPT) { + lDrvState.driverState = DriverState.INTERRUPT; return 0; } releasePlan(); releaseFetchTask(); releaseResStream(); releaseContext(); - driverState = DriverState.CLOSED; + lDrvState.driverState = DriverState.CLOSED; } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } if (SessionState.get() != null) { SessionState.get().getLineageState().clear(); @@ -2386,18 +2389,18 @@ public class Driver implements CommandProcessor { // is usually called after close() to commit or rollback a query and end the driver life cycle. // do not understand why it is needed and wonder if it could be combined with close. public void destroy() { - stateLock.lock(); + lDrvState.stateLock.lock(); try { // in the cancel case where the driver state is INTERRUPTED, destroy will be deferred to // the query process - if (driverState == DriverState.DESTROYED || - driverState == DriverState.INTERRUPT) { + if (lDrvState.driverState == DriverState.DESTROYED || + lDrvState.driverState == DriverState.INTERRUPT) { return; } else { - driverState = DriverState.DESTROYED; + lDrvState.driverState = DriverState.DESTROYED; } } finally { - stateLock.unlock(); + lDrvState.stateLock.unlock(); } if (!hiveLocks.isEmpty()) { try { http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 721974d..6399dbe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -451,6 +451,7 @@ public enum ErrorMsg { "Oldest available base: {2}", true), INVALID_COLUMN_NAME(10328, "Invalid column name"), UNSUPPORTED_SET_OPERATOR(10329, "Unsupported set operator"), + LOCK_ACQUIRE_CANCELLED(10330, "Query was cancelled while acquiring locks on the underlying objects. "), REPLACE_VIEW_WITH_MATERIALIZED(10400, "Attempt to replace view {0} with materialized view", true), REPLACE_MATERIALIZED_WITH_VIEW(10401, "Attempt to replace materialized view {0} with view", true), UPDATE_DELETE_VIEW(10402, "You cannot update or delete records in a view"), http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 45ead16..529e64c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.thrift.TException; @@ -74,7 +75,7 @@ public class DbLockManager implements HiveLockManager{ } @Override - public List lock(List objs, boolean keepAlive) throws + public List lock(List objs, boolean keepAlive, LockedDriverState lDrvState) throws LockException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index 24fbd9a..53ee9c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver.DriverState; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -110,6 +112,11 @@ class DummyTxnManager extends HiveTxnManagerImpl { @Override public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException { + acquireLocks(plan,ctx,username,null); + } + + @Override + public void acquireLocks(QueryPlan plan, Context ctx, String username, LockedDriverState lDrvState) throws LockException { // Make sure we've built the lock manager getLockManager(); @@ -171,7 +178,7 @@ class DummyTxnManager extends HiveTxnManagerImpl { } dedupLockObjects(lockObjects); - List hiveLocks = lockMgr.lock(lockObjects, false); + List hiveLocks = lockMgr.lock(lockObjects, false, lDrvState); if (hiveLocks == null) { throw new LockException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg()); http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java index 20e1147..c15035d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; import org.apache.hadoop.hive.ql.metadata.*; @@ -59,7 +60,7 @@ public class EmbeddedLockManager implements HiveLockManager { } @Override - public List lock(List objs, boolean keepAlive) throws LockException { + public List lock(List objs, boolean keepAlive, LockedDriverState lDrvState) throws LockException { return lock(objs, numRetriesForLock, sleepTime); } http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java index b2eb997..2f22d74 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import java.util.List; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; /** * Manager for locks in Hive. Users should not instantiate a lock manager @@ -37,7 +38,7 @@ public interface HiveLockManager { public HiveLock lock(HiveLockObject key, HiveLockMode mode, boolean keepAlive) throws LockException; public List lock(List objs, - boolean keepAlive) throws LockException; + boolean keepAlive, LockedDriverState lDrvState) throws LockException; public void unlock(HiveLock hiveLock) throws LockException; public void releaseLocks(List hiveLocks); http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index ce220a2..187a658 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -70,6 +71,20 @@ public interface HiveTxnManager { void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException; /** + * Acquire all of the locks needed by a query. If used with a query that + * requires transactions, this should be called after {@link #openTxn(String)}. + * A list of acquired locks will be stored in the + * {@link org.apache.hadoop.hive.ql.Context} object and can be retrieved + * via {@link org.apache.hadoop.hive.ql.Context#getHiveLocks}. + * @param plan query plan + * @param ctx Context for this query + * @param username name of the user for this query + * @param lDrvState the state to inform if the query cancelled or not + * @throws LockException if there is an error getting the locks + */ + void acquireLocks(QueryPlan plan, Context ctx, String username, LockedDriverState lDrvState) throws LockException; + + /** * Release specified locks. * Transaction aware TxnManagers, which has {@code supportsAcid() == true}, * will track locks internally and ignore this parameter http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java index ed022d9..a371a5a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java @@ -22,6 +22,9 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; +import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; @@ -56,6 +59,11 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager { } @Override + public void acquireLocks(QueryPlan plan, Context ctx, String username, LockedDriverState lDrvState) throws LockException { + acquireLocks(plan, ctx, username); + } + + @Override protected void finalize() throws Throwable { destruct(); } http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java index 14d0ef4..6ca05ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver.DriverState; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.lockmgr.*; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; @@ -146,7 +148,7 @@ public class ZooKeeperHiveLockManager implements HiveLockManager { **/ @Override public List lock(List lockObjects, - boolean keepAlive) throws LockException + boolean keepAlive, LockedDriverState lDrvState) throws LockException { // Sort the objects first. You are guaranteed that if a partition is being locked, // the table has already been locked @@ -184,16 +186,29 @@ public class ZooKeeperHiveLockManager implements HiveLockManager { } HiveLock lock = null; - try { - lock = lock(lockObject.getObj(), lockObject.getMode(), keepAlive, true); - } catch (LockException e) { - console.printError("Error in acquireLocks..." ); - LOG.error("Error in acquireLocks...", e); - lock = null; + boolean isInterrupted = false; + if (lDrvState != null) { + lDrvState.stateLock.lock(); + if (lDrvState.driverState == DriverState.INTERRUPT) { + isInterrupted = true; + } + lDrvState.stateLock.unlock(); + } + if (!isInterrupted) { + try { + lock = lock(lockObject.getObj(), lockObject.getMode(), keepAlive, true); + } catch (LockException e) { + console.printError("Error in acquireLocks..." ); + LOG.error("Error in acquireLocks...", e); + lock = null; + } } if (lock == null) { releaseLocks(hiveLocks); + if (isInterrupted) { + throw new LockException(ErrorMsg.LOCK_ACQUIRE_CANCELLED.getMsg()); + } return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java index e189d38..de3b8ad 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java @@ -26,6 +26,9 @@ import org.junit.Before; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.Driver.DriverState; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -95,8 +98,12 @@ public class TestDummyTxnManager { List expectedLocks = new ArrayList(); expectedLocks.add(new ZooKeeperHiveLock("default", new HiveLockObject(), HiveLockMode.SHARED)); expectedLocks.add(new ZooKeeperHiveLock("default.table1", new HiveLockObject(), HiveLockMode.SHARED)); - - when(mockLockManager.lock(anyListOf(HiveLockObj.class), eq(false))).thenReturn(expectedLocks); + LockedDriverState lDrvState = new LockedDriverState(); + LockedDriverState lDrvInp = new LockedDriverState(); + lDrvInp.driverState = DriverState.INTERRUPT; + LockException lEx = new LockException(ErrorMsg.LOCK_ACQUIRE_CANCELLED.getMsg()); + when(mockLockManager.lock(anyListOf(HiveLockObj.class), eq(false), eq(lDrvState))).thenReturn(expectedLocks); + when(mockLockManager.lock(anyListOf(HiveLockObj.class), eq(false), eq(lDrvInp))).thenThrow(lEx); doNothing().when(mockLockManager).setContext(any(HiveLockManagerCtx.class)); doNothing().when(mockLockManager).close(); ArgumentCaptor lockObjsCaptor = ArgumentCaptor.forClass(List.class); @@ -105,7 +112,7 @@ public class TestDummyTxnManager { when(mockQueryPlan.getOutputs()).thenReturn(new HashSet()); // Execute - txnMgr.acquireLocks(mockQueryPlan, ctx, "fred"); + txnMgr.acquireLocks(mockQueryPlan, ctx, "fred", lDrvState); // Verify Assert.assertEquals("db1", SessionState.get().getCurrentDatabase()); @@ -116,13 +123,22 @@ public class TestDummyTxnManager { Assert.assertEquals(expectedLocks.get(1).getHiveLockMode(), resultLocks.get(1).getHiveLockMode()); Assert.assertEquals(expectedLocks.get(0).getHiveLockObject().getName(), resultLocks.get(0).getHiveLockObject().getName()); - verify(mockLockManager).lock(lockObjsCaptor.capture(), eq(false)); + verify(mockLockManager).lock(lockObjsCaptor.capture(), eq(false), eq(lDrvState)); List lockObjs = lockObjsCaptor.getValue(); Assert.assertEquals(2, lockObjs.size()); Assert.assertEquals("default", lockObjs.get(0).getName()); Assert.assertEquals(HiveLockMode.SHARED, lockObjs.get(0).mode); Assert.assertEquals("default/table1", lockObjs.get(1).getName()); Assert.assertEquals(HiveLockMode.SHARED, lockObjs.get(1).mode); + + // Execute + try { + txnMgr.acquireLocks(mockQueryPlan, ctx, "fred", lDrvInp); + Assert.fail(); + } catch(LockException le) { + Assert.assertEquals(le.getMessage(), ErrorMsg.LOCK_ACQUIRE_CANCELLED.getMsg()); + } + } @Test