hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ych...@apache.org
Subject hive git commit: HIVE-15572: Improve the response time for query canceling when it happens during acquiring locks (Yongzhi Chen, reviewed by Chaoyu Tang)
Date Fri, 20 Jan 2017 02:26:44 GMT
Repository: hive
Updated Branches:
  refs/heads/master 92090823d -> dbc2dffcd


HIVE-15572: Improve the response time for query canceling when it happens during acquiring
locks (Yongzhi Chen, reviewed by Chaoyu Tang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dbc2dffc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dbc2dffc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dbc2dffc

Branch: refs/heads/master
Commit: dbc2dffcd212a023ed43a932a716a177e5b466ef
Parents: 9209082
Author: Yongzhi Chen <ychena@apache.org>
Authored: Tue Jan 10 21:48:34 2017 -0500
Committer: Yongzhi Chen <ychena@apache.org>
Committed: Thu Jan 19 21:25:52 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/Driver.java  | 113 ++++++++++---------
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |   1 +
 .../hadoop/hive/ql/lockmgr/DbLockManager.java   |   3 +-
 .../hadoop/hive/ql/lockmgr/DummyTxnManager.java |   9 +-
 .../hive/ql/lockmgr/EmbeddedLockManager.java    |   3 +-
 .../hadoop/hive/ql/lockmgr/HiveLockManager.java |   3 +-
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java  |  15 +++
 .../hive/ql/lockmgr/HiveTxnManagerImpl.java     |   8 ++
 .../zookeeper/ZooKeeperHiveLockManager.java     |  29 +++--
 .../hive/ql/lockmgr/TestDummyTxnManager.java    |  24 +++-
 10 files changed, 138 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index fd6020b..efa2bdc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -172,6 +172,7 @@ public class Driver implements CommandProcessor {
 
   // For WebUI.  Kept alive after queryPlan is freed.
   private final QueryDisplay queryDisplay = new QueryDisplay();
+  private LockedDriverState lDrvState = new LockedDriverState();
 
   // Query specific info
   private QueryState queryState;
@@ -179,12 +180,7 @@ public class Driver implements CommandProcessor {
   // Query hooks that execute before compilation and after execution
   List<QueryLifeTimeHook> queryHooks;
 
-  // a lock is used for synchronizing the state transition and its associated
-  // resource releases
-  private final ReentrantLock stateLock = new ReentrantLock();
-  private DriverState driverState = DriverState.INITIALIZED;
-
-  private enum DriverState {
+  public enum DriverState {
     INITIALIZED,
     COMPILING,
     COMPILED,
@@ -201,6 +197,13 @@ public class Driver implements CommandProcessor {
     ERROR
   }
 
+  public static class LockedDriverState {
+    // a lock is used for synchronizing the state transition and its associated
+    // resource releases
+    public final ReentrantLock stateLock = new ReentrantLock();
+    public DriverState driverState = DriverState.INITIALIZED;
+  }
+
   private boolean checkConcurrency() {
     boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
     if (!supportConcurrency) {
@@ -381,11 +384,11 @@ public class Driver implements CommandProcessor {
     PerfLogger perfLogger = SessionState.getPerfLogger(true);
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
-    stateLock.lock();
+    lDrvState.stateLock.lock();
     try {
-      driverState = DriverState.COMPILING;
+      lDrvState.driverState = DriverState.COMPILING;
     } finally {
-      stateLock.unlock();
+      lDrvState.stateLock.unlock();
     }
 
     command = new VariableSubstitution(new HiveVariableSource() {
@@ -623,15 +626,15 @@ public class Driver implements CommandProcessor {
       if (isInterrupted && !deferClose) {
         closeInProcess(true);
       }
-      stateLock.lock();
+      lDrvState.stateLock.lock();
       try {
         if (isInterrupted) {
-          driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR;
+          lDrvState.driverState = deferClose ? DriverState.EXECUTING : DriverState.ERROR;
         } else {
-          driverState = compileError ? DriverState.ERROR : DriverState.COMPILED;
+          lDrvState.driverState = compileError ? DriverState.ERROR : DriverState.COMPILED;
         }
       } finally {
-        stateLock.unlock();
+        lDrvState.stateLock.unlock();
       }
 
       if (isInterrupted) {
@@ -650,16 +653,16 @@ public class Driver implements CommandProcessor {
   }
 
   private boolean isInterrupted() {
-    stateLock.lock();
+    lDrvState.stateLock.lock();
     try {
-      if (driverState == DriverState.INTERRUPT) {
+      if (lDrvState.driverState == DriverState.INTERRUPT) {
         Thread.currentThread().interrupt();
         return true;
       } else {
         return false;
       }
     } finally {
-      stateLock.unlock();
+      lDrvState.stateLock.unlock();
     }
   }
 
@@ -1123,7 +1126,7 @@ public class Driver implements CommandProcessor {
       see the changes made by 1st one.  This takes care of autoCommit=true case.
       For multi-stmt txns this is not sufficient and will be managed via WriteSet tracking
       in the lock manager.*/
-      txnMgr.acquireLocks(plan, ctx, userFromUGI);
+      txnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);
       if(initiatingTransaction || (readOnlyQueryInAutoCommit && acidInQuery)) {
         //For multi-stmt txns we should record the snapshot when txn starts but
         // don't update it after that until txn completes.  Thus the check for {@code initiatingTransaction}
@@ -1394,21 +1397,21 @@ public class Driver implements CommandProcessor {
     errorMessage = null;
     SQLState = null;
     downstreamError = null;
-    stateLock.lock();
+    lDrvState.stateLock.lock();
     try {
       if (alreadyCompiled) {
-        if (driverState == DriverState.COMPILED) {
-          driverState = DriverState.EXECUTING;
+        if (lDrvState.driverState == DriverState.COMPILED) {
+          lDrvState.driverState = DriverState.EXECUTING;
         } else {
           errorMessage = "FAILED: Precompiled query has been cancelled or closed.";
           console.printError(errorMessage);
           return createProcessorResponse(12);
         }
       } else {
-        driverState = DriverState.COMPILING;
+        lDrvState.driverState = DriverState.COMPILING;
       }
     } finally {
-      stateLock.unlock();
+      lDrvState.stateLock.unlock();
     }
 
     // a flag that helps to set the correct driver state in finally block by tracking if
@@ -1555,15 +1558,15 @@ public class Driver implements CommandProcessor {
         // only release the related resources ctx, driverContext as normal
         releaseResources();
       }
-      stateLock.lock();
+      lDrvState.stateLock.lock();
       try {
-        if (driverState == DriverState.INTERRUPT) {
-          driverState = DriverState.ERROR;
+        if (lDrvState.driverState == DriverState.INTERRUPT) {
+          lDrvState.driverState = DriverState.ERROR;
         } else {
-          driverState = isFinishedWithError ? DriverState.ERROR : DriverState.EXECUTED;
+          lDrvState.driverState = isFinishedWithError ? DriverState.ERROR : DriverState.EXECUTED;
         }
       } finally {
-        stateLock.unlock();
+        lDrvState.stateLock.unlock();
       }
     }
   }
@@ -1690,22 +1693,22 @@ public class Driver implements CommandProcessor {
     // hide sensitive information during query redaction.
     String queryStr = conf.getQueryString();
 
-    stateLock.lock();
+    lDrvState.stateLock.lock();
     try {
       // if query is not in compiled state, or executing state which is carried over from
       // a combined compile/execute in runInternal, throws the error
-      if (driverState != DriverState.COMPILED &&
-          driverState != DriverState.EXECUTING) {
+      if (lDrvState.driverState != DriverState.COMPILED &&
+          lDrvState.driverState != DriverState.EXECUTING) {
         SQLState = "HY008";
         errorMessage = "FAILED: query " + queryStr + " has " +
-            (driverState == DriverState.INTERRUPT ? "been cancelled" : "not been compiled.");
+            (lDrvState.driverState == DriverState.INTERRUPT ? "been cancelled" : "not been
compiled.");
         console.printError(errorMessage);
         return 1000;
       } else {
-        driverState = DriverState.EXECUTING;
+        lDrvState.driverState = DriverState.EXECUTING;
       }
     } finally {
-      stateLock.unlock();
+      lDrvState.stateLock.unlock();
     }
 
     maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
@@ -2017,17 +2020,17 @@ public class Driver implements CommandProcessor {
       if (isInterrupted && !deferClose) {
         closeInProcess(true);
       }
-      stateLock.lock();
+      lDrvState.stateLock.lock();
       try {
         if (isInterrupted) {
           if (!deferClose) {
-            driverState = DriverState.ERROR;
+            lDrvState.driverState = DriverState.ERROR;
           }
         } else {
-          driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED;
+          lDrvState.driverState = executionError ? DriverState.ERROR : DriverState.EXECUTED;
         }
       } finally {
-        stateLock.unlock();
+        lDrvState.stateLock.unlock();
       }
       if (isInterrupted) {
         LOG.info("Executing command(queryId=" + queryId + ") has been interrupted after "
+ duration + " seconds");
@@ -2045,7 +2048,7 @@ public class Driver implements CommandProcessor {
 
   private void releasePlan(QueryPlan plan) {
     // Plan maybe null if Driver.close is called in another thread for the same Driver object
-    stateLock.lock();
+    lDrvState.stateLock.lock();
     try {
       if (plan != null) {
         plan.setDone();
@@ -2059,7 +2062,7 @@ public class Driver implements CommandProcessor {
         }
       }
     } finally {
-      stateLock.unlock();
+      lDrvState.stateLock.unlock();
     }
   }
 
@@ -2176,7 +2179,7 @@ public class Driver implements CommandProcessor {
 
   @SuppressWarnings("unchecked")
   public boolean getResults(List res) throws IOException, CommandNeedRetryException {
-    if (driverState == DriverState.DESTROYED || driverState == DriverState.CLOSED) {
+    if (lDrvState.driverState == DriverState.DESTROYED || lDrvState.driverState == DriverState.CLOSED)
{
       throw new IOException("FAILED: query has been cancelled, closed, or destroyed.");
     }
 
@@ -2240,7 +2243,7 @@ public class Driver implements CommandProcessor {
   }
 
   public void resetFetch() throws IOException {
-    if (driverState == DriverState.DESTROYED || driverState == DriverState.CLOSED) {
+    if (lDrvState.driverState == DriverState.DESTROYED || lDrvState.driverState == DriverState.CLOSED)
{
       throw new IOException("FAILED: driver has been cancelled, closed or destroyed.");
     }
     if (isFetchingTable()) {
@@ -2268,7 +2271,7 @@ public class Driver implements CommandProcessor {
   // DriverContext could be released in the query and close processes at same
   // time, which needs to be thread protected.
   private void releaseDriverContext() {
-    stateLock.lock();
+    lDrvState.stateLock.lock();
     try {
       if (driverCxt != null) {
         driverCxt.shutdown();
@@ -2277,7 +2280,7 @@ public class Driver implements CommandProcessor {
     } catch (Exception e) {
       LOG.debug("Exception while shutting down the task runner", e);
     } finally {
-      stateLock.unlock();
+      lDrvState.stateLock.unlock();
     }
   }
 
@@ -2360,22 +2363,22 @@ public class Driver implements CommandProcessor {
 
   // is called to stop the query if it is running, clean query results, and release resources.
   public int close() {
-    stateLock.lock();
+    lDrvState.stateLock.lock();
     try {
       releaseDriverContext();
-      if (driverState == DriverState.COMPILING ||
-          driverState == DriverState.EXECUTING ||
-          driverState == DriverState.INTERRUPT) {
-        driverState = DriverState.INTERRUPT;
+      if (lDrvState.driverState == DriverState.COMPILING ||
+          lDrvState.driverState == DriverState.EXECUTING ||
+          lDrvState.driverState == DriverState.INTERRUPT) {
+        lDrvState.driverState = DriverState.INTERRUPT;
         return 0;
       }
       releasePlan();
       releaseFetchTask();
       releaseResStream();
       releaseContext();
-      driverState = DriverState.CLOSED;
+      lDrvState.driverState = DriverState.CLOSED;
     } finally {
-      stateLock.unlock();
+      lDrvState.stateLock.unlock();
     }
     if (SessionState.get() != null) {
       SessionState.get().getLineageState().clear();
@@ -2386,18 +2389,18 @@ public class Driver implements CommandProcessor {
   // is usually called after close() to commit or rollback a query and end the driver life
cycle.
   // do not understand why it is needed and wonder if it could be combined with close.
   public void destroy() {
-    stateLock.lock();
+    lDrvState.stateLock.lock();
     try {
       // in the cancel case where the driver state is INTERRUPTED, destroy will be deferred
to
       // the query process
-      if (driverState == DriverState.DESTROYED ||
-          driverState == DriverState.INTERRUPT) {
+      if (lDrvState.driverState == DriverState.DESTROYED ||
+          lDrvState.driverState == DriverState.INTERRUPT) {
         return;
       } else {
-        driverState = DriverState.DESTROYED;
+        lDrvState.driverState = DriverState.DESTROYED;
       }
     } finally {
-      stateLock.unlock();
+      lDrvState.stateLock.unlock();
     }
     if (!hiveLocks.isEmpty()) {
       try {

http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 721974d..6399dbe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -451,6 +451,7 @@ public enum ErrorMsg {
     "Oldest available base: {2}", true),
   INVALID_COLUMN_NAME(10328, "Invalid column name"),
   UNSUPPORTED_SET_OPERATOR(10329, "Unsupported set operator"),
+  LOCK_ACQUIRE_CANCELLED(10330, "Query was cancelled while acquiring locks on the underlying
objects. "),
   REPLACE_VIEW_WITH_MATERIALIZED(10400, "Attempt to replace view {0} with materialized view",
true),
   REPLACE_MATERIALIZED_WITH_VIEW(10401, "Attempt to replace materialized view {0} with view",
true),
   UPDATE_DELETE_VIEW(10402, "You cannot update or delete records in a view"),

http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index 45ead16..529e64c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.thrift.TException;
 
@@ -74,7 +75,7 @@ public class DbLockManager implements HiveLockManager{
   }
 
   @Override
-  public List<HiveLock> lock(List<HiveLockObj> objs, boolean keepAlive) throws
+  public List<HiveLock> lock(List<HiveLockObj> objs, boolean keepAlive, LockedDriverState
lDrvState) throws
       LockException {
     throw new UnsupportedOperationException();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 24fbd9a..53ee9c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Driver.DriverState;
+import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -110,6 +112,11 @@ class DummyTxnManager extends HiveTxnManagerImpl {
 
   @Override
   public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException
{
+    acquireLocks(plan,ctx,username,null);
+  }
+
+  @Override
+  public void acquireLocks(QueryPlan plan, Context ctx, String username, LockedDriverState
lDrvState) throws LockException {
     // Make sure we've built the lock manager
     getLockManager();
 
@@ -171,7 +178,7 @@ class DummyTxnManager extends HiveTxnManagerImpl {
     }
 
     dedupLockObjects(lockObjects);
-    List<HiveLock> hiveLocks = lockMgr.lock(lockObjects, false);
+    List<HiveLock> hiveLocks = lockMgr.lock(lockObjects, false, lDrvState);
 
     if (hiveLocks == null) {
       throw new LockException(ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg());

http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java
index 20e1147..c15035d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/EmbeddedLockManager.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.lockmgr;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
 import org.apache.hadoop.hive.ql.metadata.*;
 
@@ -59,7 +60,7 @@ public class EmbeddedLockManager implements HiveLockManager {
   }
 
   @Override
-  public List<HiveLock> lock(List<HiveLockObj> objs, boolean keepAlive) throws
LockException {
+  public List<HiveLock> lock(List<HiveLockObj> objs, boolean keepAlive, LockedDriverState
lDrvState) throws LockException {
     return lock(objs, numRetriesForLock, sleepTime);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java
index b2eb997..2f22d74 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockManager.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.lockmgr;
 
 import java.util.List;
+import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 
 /**
  * Manager for locks in Hive.  Users should not instantiate a lock manager
@@ -37,7 +38,7 @@ public interface HiveLockManager {
   public HiveLock lock(HiveLockObject key, HiveLockMode mode,
       boolean keepAlive) throws LockException;
   public List<HiveLock> lock(List<HiveLockObj> objs,
-      boolean keepAlive) throws LockException;
+      boolean keepAlive, LockedDriverState lDrvState) throws LockException;
   public void unlock(HiveLock hiveLock) throws LockException;
   public void releaseLocks(List<HiveLock> hiveLocks);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index ce220a2..187a658 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr;
 
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -70,6 +71,20 @@ public interface HiveTxnManager {
   void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException;
 
   /**
+   * Acquire all of the locks needed by a query.  If used with a query that
+   * requires transactions, this should be called after {@link #openTxn(String)}.
+   * A list of acquired locks will be stored in the
+   * {@link org.apache.hadoop.hive.ql.Context} object and can be retrieved
+   * via {@link org.apache.hadoop.hive.ql.Context#getHiveLocks}.
+   * @param plan query plan
+   * @param ctx Context for this query
+   * @param username name of the user for this query
+   * @param lDrvState the state to inform if the query cancelled or not
+   * @throws LockException if there is an error getting the locks
+   */
+   void acquireLocks(QueryPlan plan, Context ctx, String username, LockedDriverState lDrvState)
throws LockException;
+
+  /**
    * Release specified locks.
    * Transaction aware TxnManagers, which has {@code supportsAcid() == true},
    * will track locks internally and ignore this parameter

http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
index ed022d9..a371a5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
@@ -22,6 +22,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
+import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
@@ -56,6 +59,11 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
   }
 
   @Override
+  public void acquireLocks(QueryPlan plan, Context ctx, String username, LockedDriverState
lDrvState) throws LockException {
+    acquireLocks(plan, ctx, username);
+  }
+
+  @Override
   protected void finalize() throws Throwable {
     destruct();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
index 14d0ef4..6ca05ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Driver.DriverState;
+import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.lockmgr.*;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
@@ -146,7 +148,7 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
    **/
   @Override
   public List<HiveLock> lock(List<HiveLockObj> lockObjects,
-      boolean keepAlive) throws LockException
+      boolean keepAlive, LockedDriverState lDrvState) throws LockException
   {
     // Sort the objects first. You are guaranteed that if a partition is being locked,
     // the table has already been locked
@@ -184,16 +186,29 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
       }
 
       HiveLock lock = null;
-      try {
-        lock = lock(lockObject.getObj(), lockObject.getMode(), keepAlive, true);
-      } catch (LockException e) {
-        console.printError("Error in acquireLocks..." );
-        LOG.error("Error in acquireLocks...", e);
-        lock = null;
+      boolean isInterrupted = false;
+      if (lDrvState != null) {
+        lDrvState.stateLock.lock();
+        if (lDrvState.driverState == DriverState.INTERRUPT) {
+          isInterrupted = true;
+        }
+        lDrvState.stateLock.unlock();
+      }
+      if (!isInterrupted) {
+        try {
+          lock = lock(lockObject.getObj(), lockObject.getMode(), keepAlive, true);
+        } catch (LockException e) {
+          console.printError("Error in acquireLocks..." );
+          LOG.error("Error in acquireLocks...", e);
+          lock = null;
+        }
       }
 
       if (lock == null) {
         releaseLocks(hiveLocks);
+        if (isInterrupted) {
+          throw new LockException(ErrorMsg.LOCK_ACQUIRE_CANCELLED.getMsg());
+        }
         return null;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/dbc2dffc/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java
index e189d38..de3b8ad 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDummyTxnManager.java
@@ -26,6 +26,9 @@ import org.junit.Before;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.Driver.DriverState;
+import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -95,8 +98,12 @@ public class TestDummyTxnManager {
     List<HiveLock> expectedLocks = new ArrayList<HiveLock>();
     expectedLocks.add(new ZooKeeperHiveLock("default", new HiveLockObject(), HiveLockMode.SHARED));
     expectedLocks.add(new ZooKeeperHiveLock("default.table1", new HiveLockObject(), HiveLockMode.SHARED));
-
-    when(mockLockManager.lock(anyListOf(HiveLockObj.class), eq(false))).thenReturn(expectedLocks);
+    LockedDriverState lDrvState = new LockedDriverState();
+    LockedDriverState lDrvInp = new LockedDriverState();
+    lDrvInp.driverState = DriverState.INTERRUPT;
+    LockException lEx = new LockException(ErrorMsg.LOCK_ACQUIRE_CANCELLED.getMsg());
+    when(mockLockManager.lock(anyListOf(HiveLockObj.class), eq(false), eq(lDrvState))).thenReturn(expectedLocks);
+    when(mockLockManager.lock(anyListOf(HiveLockObj.class), eq(false), eq(lDrvInp))).thenThrow(lEx);
     doNothing().when(mockLockManager).setContext(any(HiveLockManagerCtx.class));
     doNothing().when(mockLockManager).close();
     ArgumentCaptor<List> lockObjsCaptor = ArgumentCaptor.forClass(List.class);
@@ -105,7 +112,7 @@ public class TestDummyTxnManager {
     when(mockQueryPlan.getOutputs()).thenReturn(new HashSet<WriteEntity>());
 
     // Execute
-    txnMgr.acquireLocks(mockQueryPlan, ctx, "fred");
+    txnMgr.acquireLocks(mockQueryPlan, ctx, "fred", lDrvState);
 
     // Verify
     Assert.assertEquals("db1", SessionState.get().getCurrentDatabase());
@@ -116,13 +123,22 @@ public class TestDummyTxnManager {
     Assert.assertEquals(expectedLocks.get(1).getHiveLockMode(), resultLocks.get(1).getHiveLockMode());
     Assert.assertEquals(expectedLocks.get(0).getHiveLockObject().getName(), resultLocks.get(0).getHiveLockObject().getName());
 
-    verify(mockLockManager).lock(lockObjsCaptor.capture(), eq(false));
+    verify(mockLockManager).lock(lockObjsCaptor.capture(), eq(false), eq(lDrvState));
     List<HiveLockObj> lockObjs = lockObjsCaptor.getValue();
     Assert.assertEquals(2, lockObjs.size());
     Assert.assertEquals("default", lockObjs.get(0).getName());
     Assert.assertEquals(HiveLockMode.SHARED, lockObjs.get(0).mode);
     Assert.assertEquals("default/table1", lockObjs.get(1).getName());
     Assert.assertEquals(HiveLockMode.SHARED, lockObjs.get(1).mode);
+
+    // Execute
+    try {
+      txnMgr.acquireLocks(mockQueryPlan, ctx, "fred", lDrvInp);
+      Assert.fail();
+    } catch(LockException le) {
+      Assert.assertEquals(le.getMessage(), ErrorMsg.LOCK_ACQUIRE_CANCELLED.getMsg());
+    }
+
   }
 
   @Test


Mime
View raw message