hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ych...@apache.org
Subject hive git commit: HIVE-15997: Resource leaks when query is cancelled (Yongzhi Chen, reviewed by Chaoyu Tang)
Date Wed, 08 Mar 2017 17:50:15 GMT
Repository: hive
Updated Branches:
  refs/heads/master f72ea223a -> 35d707950


HIVE-15997: Resource leaks when query is cancelled (Yongzhi Chen, reviewed by Chaoyu Tang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/35d70795
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/35d70795
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/35d70795

Branch: refs/heads/master
Commit: 35d707950ddd210c37533be3da51cea730bac881
Parents: f72ea22
Author: Yongzhi Chen <ychena@apache.org>
Authored: Wed Mar 8 12:46:43 2017 -0500
Committer: Yongzhi Chen <ychena@apache.org>
Committed: Wed Mar 8 12:49:20 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  1 -
 .../hadoop/hive/ql/exec/mr/ExecDriver.java      | 20 ++++++++++++++++++++
 .../ql/exec/spark/LocalHiveSparkClient.java     |  5 +++++
 .../ql/exec/spark/RemoteHiveSparkClient.java    |  5 +++++
 .../zookeeper/ZooKeeperHiveLockManager.java     | 20 +++++++++++++++++---
 5 files changed, 47 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/35d70795/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index cdf24d4..d981119 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -655,7 +655,6 @@ public class Driver implements CommandProcessor {
     lDrvState.stateLock.lock();
     try {
       if (lDrvState.driverState == DriverState.INTERRUPT) {
-        Thread.currentThread().interrupt();
         return true;
       } else {
         return false;

http://git-wip-us.apache.org/repos/asf/hive/blob/35d70795/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 34b683c..1945163 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -225,6 +225,11 @@ public class ExecDriver extends Task<MapredWork> implements Serializable,
Hadoop
     Path emptyScratchDir;
     JobClient jc = null;
 
+    if (driverContext.isShutdown()) {
+      LOG.warn("Task was cancelled");
+      return 5;
+    }
+
     MapWork mWork = work.getMapWork();
     ReduceWork rWork = work.getReduceWork();
 
@@ -398,7 +403,22 @@ public class ExecDriver extends Task<MapredWork> implements Serializable,
Hadoop
 
       HiveConfUtil.updateJobCredentialProviders(job);
       // Finally SUBMIT the JOB!
+      if (driverContext.isShutdown()) {
+        LOG.warn("Task was cancelled");
+        return 5;
+      }
+
       rj = jc.submitJob(job);
+
+      if (driverContext.isShutdown()) {
+        LOG.warn("Task was cancelled");
+        if (rj != null) {
+          rj.killJob();
+          rj = null;
+        }
+        return 5;
+      }
+
       this.jobID = rj.getJobID();
       updateStatusInQueryDisplay();
       returnVal = jobExecHelper.progress(rj, jc, ctx);

http://git-wip-us.apache.org/repos/asf/hive/blob/35d70795/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
index f5d9c4c..beeafd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus;
 import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -135,6 +136,10 @@ public class LocalHiveSparkClient implements HiveSparkClient {
       new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir, sparkReporter);
     SparkPlan plan = gen.generate(sparkWork);
 
+    if (driverContext.isShutdown()) {
+      throw new HiveException("Operation is cancelled.");
+    }
+
     // Execute generated plan.
     JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
     // We use Spark RDD async action to submit job as it's the only way to get jobId now.

http://git-wip-us.apache.org/repos/asf/hive/blob/35d70795/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 6caf2b7..4c69899 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
 import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -207,6 +208,10 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
     byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork);
 
     JobStatusJob job = new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes);
+    if (driverContext.isShutdown()) {
+      throw new HiveException("Operation is cancelled.");
+    }
+
     JobHandle<Serializable> jobHandle = remoteClient.submit(job);
     RemoteSparkJobStatus sparkJobStatus = new RemoteSparkJobStatus(remoteClient, jobHandle,
sparkClientTimtout);
     return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus);

http://git-wip-us.apache.org/repos/asf/hive/blob/35d70795/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
index 6ca05ed..c2a4806 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
@@ -486,12 +486,26 @@ public class ZooKeeperHiveLockManager implements HiveLockManager {
     HiveLockObject obj = zLock.getHiveLockObject();
     String name  = getLastObjectName(parent, obj);
     try {
-      curatorFramework.delete().forPath(zLock.getPath());
+      //catch InterruptedException to make sure locks can be released when the query is cancelled.
+      try {
+        curatorFramework.delete().forPath(zLock.getPath());
+      } catch (InterruptedException ie) {
+        curatorFramework.delete().forPath(zLock.getPath());
+      }
 
       // Delete the parent node if all the children have been deleted
-      List<String> children = curatorFramework.getChildren().forPath(name);
+      List<String> children = null;
+      try {
+        children = curatorFramework.getChildren().forPath(name);
+      } catch (InterruptedException ie) {
+        children = curatorFramework.getChildren().forPath(name);
+      }
       if (children == null || children.isEmpty()) {
-        curatorFramework.delete().forPath(name);
+        try {
+          curatorFramework.delete().forPath(name);
+        } catch (InterruptedException ie) {
+          curatorFramework.delete().forPath(name);
+        }
       }
       Metrics metrics = MetricsFactory.getInstance();
       if (metrics != null) {


Mime
View raw message