hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject hive git commit: HIVE-16820 : TezTask may not shut down correctly before submit (Sergey Shelukhin, reviewed by Siddharth Seth)
Date Tue, 13 Jun 2017 01:37:53 GMT
Repository: hive
Updated Branches:
  refs/heads/master 7acd64243 -> 49ae8b694


HIVE-16820 : TezTask may not shut down correctly before submit (Sergey Shelukhin, reviewed
by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/49ae8b69
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/49ae8b69
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/49ae8b69

Branch: refs/heads/master
Commit: 49ae8b69400d699a794f0d925b4efa55ebbd9d5e
Parents: 7acd642
Author: sergey <sershe@apache.org>
Authored: Mon Jun 12 13:24:57 2017 -0700
Committer: sergey <sershe@apache.org>
Committed: Mon Jun 12 13:24:57 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 75 +++++++++++++++-----
 .../hadoop/hive/ql/exec/tez/TestTezTask.java    |  2 +-
 2 files changed, 57 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/49ae8b69/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 7ee6186..4857ddc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.io.Serializable;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 
 import java.io.IOException;
@@ -88,6 +86,8 @@ import org.apache.tez.dag.api.client.VertexStatus;
 import org.json.JSONObject;
 import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  *
  * TezTask handles the execution of TezWork. Currently it executes a graph of map and reduce
work
@@ -105,6 +105,8 @@ public class TezTask extends Task<TezWork> {
 
   private final DagUtils utils;
 
+  private final Object dagClientLock = new Object();
+  private volatile boolean isShutdown = false;
   private DAGClient dagClient = null;
 
   Map<BaseWork, Vertex> workToVertex = new HashMap<BaseWork, Vertex>();
@@ -189,9 +191,24 @@ public class TezTask extends Task<TezWork> {
         // Add the extra resources to the dag
         addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources);
 
-        // submit will send the job to the cluster and start executing
-        dagClient = submit(jobConf, dag, scratchDir, appJarLr, session,
+        // Check isShutdown opportunistically; it's never unset.
+        if (this.isShutdown) {
+          throw new HiveException("Operation cancelled");
+        }
+        DAGClient dagClient = submit(jobConf, dag, scratchDir, appJarLr, session,
             additionalLr, inputOutputJars, inputOutputLocalResources);
+        boolean wasShutdown = false;
+        synchronized (dagClientLock) {
+          assert this.dagClient == null;
+          wasShutdown = this.isShutdown;
+          if (!wasShutdown) {
+            this.dagClient = dagClient;
+          }
+        }
+        if (wasShutdown) {
+          closeDagClientOnCancellation(dagClient);
+          throw new HiveException("Operation cancelled");
+        }
 
         // finally monitor will print progress until the job is done
         TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(),dagClient, conf, dag,
ctx);
@@ -254,14 +271,33 @@ public class TezTask extends Task<TezWork> {
         }
       }
       // need to either move tmp files or remove them
+      DAGClient dagClient = null;
+      synchronized (dagClientLock) {
+        dagClient = this.dagClient;
+        this.dagClient = null;
+      }
+      // TODO: not clear why we don't do the rest of the cleanup if dagClient is not created.
+      //       E.g. jobClose will be called if we fail after dagClient creation but no before...
+      //       DagClient as such should have no bearing on jobClose.
       if (dagClient != null) {
         // rc will only be overwritten if close errors out
-        rc = close(work, rc);
+        rc = close(work, rc, dagClient);
       }
     }
     return rc;
   }
 
+  private void closeDagClientOnCancellation(DAGClient dagClient) {
+    try {
+      dagClient.tryKillDAG();
+      LOG.info("Waiting for Tez task to shut down: " + this);
+      dagClient.waitForCompletion();
+    } catch (Exception ex) {
+      LOG.warn("Failed to shut down TezTask" + this, ex);
+    }
+    closeDagClientWithoutEx(dagClient);
+  }
+
   private void logResources(List<LocalResource> additionalLr) {
     // log which resources we're adding (apart from the hive exec)
     if (!LOG.isDebugEnabled()) return;
@@ -544,7 +580,8 @@ public class TezTask extends Task<TezWork> {
    * close will move the temp files into the right place for the fetch
    * task. If the job has failed it will clean up the files.
    */
-  int close(TezWork work, int rc) {
+  @VisibleForTesting
+  int close(TezWork work, int rc, DAGClient dagClient) {
     try {
       List<BaseWork> ws = work.getAllWork();
       for (BaseWork w: ws) {
@@ -564,7 +601,9 @@ public class TezTask extends Task<TezWork> {
         console.printError(mesg, "\n" + StringUtils.stringifyException(e));
       }
     }
-    closeDagClientWithoutEx();
+    if (dagClient != null) { // null in tests
+      closeDagClientWithoutEx(dagClient);
+    }
     return rc;
   }
 
@@ -572,10 +611,9 @@ public class TezTask extends Task<TezWork> {
    * Close DagClient, log warning if it throws any exception.
    * We don't want to fail query if that function fails.
    */
-  private void closeDagClientWithoutEx(){
+  private static void closeDagClientWithoutEx(DAGClient dagClient) {
     try {
       dagClient.close();
-      dagClient = null;
     } catch (Exception e) {
       LOG.warn("Failed to close DagClient", e);
     }
@@ -642,17 +680,16 @@ public class TezTask extends Task<TezWork> {
   @Override
   public void shutdown() {
     super.shutdown();
-    if (dagClient != null) {
-      LOG.info("Shutting down Tez task " + this);
-      try {
-        dagClient.tryKillDAG();
-        LOG.info("Waiting for Tez task to shut down: " + this);
-        dagClient.waitForCompletion();
-      } catch (Exception ex) {
-        LOG.warn("Failed to shut down TezTask" + this, ex);
-      }
-      closeDagClientWithoutEx();
+    DAGClient dagClient = null;
+    synchronized (dagClientLock) {
+      isShutdown = true;
+      dagClient = this.dagClient;
+      // Don't set dagClient to null here - execute will only clean up operators if it's
set.
     }
+    LOG.info("Shutting down Tez task " + this + " "
+        + ((dagClient == null) ? " before submit" : ""));
+    if (dagClient == null) return;
+    closeDagClientOnCancellation(dagClient);
   }
 
   /** DAG client that does dumb global sync on all the method calls;

http://git-wip-us.apache.org/repos/asf/hive/blob/49ae8b69/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index 70fedb7..5c80654 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -233,7 +233,7 @@ public class TestTezTask {
 
   @Test
   public void testClose() throws HiveException {
-    task.close(work, 0);
+    task.close(work, 0, null);
     verify(op, times(4)).jobClose(any(Configuration.class), eq(true));
   }
 


Mime
View raw message