hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject hive git commit: HIVE-13858. LLAP: A preempted task can end up waiting on completeInitialization if some part of the executing code suppressed the interrupt. (Siddharth Seth, reviewed by Prasanth Jayachandran) (cherry picked from commit ea489757103bf6bc6
Date Wed, 01 Jun 2016 18:09:36 GMT
Repository: hive
Updated Branches:
  refs/heads/branch-2.1 11281a543 -> 705ce7db4


HIVE-13858. LLAP: A preempted task can end up waiting on completeInitialization if some part
of the executing code suppressed the interrupt. (Siddharth Seth, reviewed by Prasanth Jayachandran)
(cherry picked from commit ea489757103bf6bc6881b0e8c68c67dc79801d70)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/705ce7db
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/705ce7db
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/705ce7db

Branch: refs/heads/branch-2.1
Commit: 705ce7db4e0919223bc99acf6dd407589db462c5
Parents: 11281a5
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Jun 1 11:08:25 2016 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Jun 1 11:09:27 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/Operator.java    | 67 ++++++++++++++++----
 .../hive/ql/exec/tez/MapRecordProcessor.java    |  8 +--
 .../ql/exec/tez/MergeFileRecordProcessor.java   |  3 +-
 .../hive/ql/exec/tez/RecordProcessor.java       | 15 ++++-
 .../hive/ql/exec/tez/ReduceRecordProcessor.java |  8 +--
 .../hadoop/hive/ql/exec/tez/TezProcessor.java   |  6 +-
 6 files changed, 79 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/705ce7db/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 636f079..be141c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -28,7 +28,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
@@ -71,7 +74,7 @@ public abstract class Operator<T extends OperatorDesc> implements
Serializable,C
   protected List<Operator<? extends OperatorDesc>> childOperators;
   protected List<Operator<? extends OperatorDesc>> parentOperators;
   protected String operatorId;
-  protected AtomicBoolean abortOp;
+  protected final AtomicBoolean abortOp;
   private transient ExecMapperContext execContext;
   private transient boolean rootInitializeCalled = false;
   protected final transient Collection<Future<?>> asyncInitOperations = new HashSet<>();
@@ -390,24 +393,62 @@ public abstract class Operator<T extends OperatorDesc> implements
Serializable,C
     Object[] os = new Object[fs.size()];
     int i = 0;
     Throwable asyncEx = null;
+
+    // Wait for all futures to complete. Check for an abort while waiting for each future.
If any of the futures is cancelled / aborted - cancel all subsequent futures.
+
+    boolean cancelAll = false;
     for (Future<?> f : fs) {
-      if (abortOp.get() || asyncEx != null) {
-        // We were aborted, interrupted or one of the operations failed; terminate all.
-        f.cancel(true);
+      // If aborted - break out of the loop, and cancel all subsequent futures.
+      if (cancelAll) {
+        break;
+      }
+      if (abortOp.get()) {
+        cancelAll = true;
+        break;
       } else {
-        try {
-          os[i++] = f.get();
-        } catch (CancellationException ex) {
-          asyncEx = new InterruptedException("Future was canceled");
-        } catch (Throwable t) {
-          f.cancel(true);
-          asyncEx = t;
+        // Wait for the current future.
+        while (true) {
+          if (abortOp.get()) {
+            cancelAll = true;
+            break;
+          } else {
+            try {
+              // Await future result with a timeout to check the abort field occasionally.
+              // It's possible that the interrupt which comes in along with an abort, is
suppressed
+              // by some other operator.
+              Object futureResult = f.get(200l, TimeUnit.MILLISECONDS);
+              os[i++] = futureResult;
+              break;
+            } catch (TimeoutException e) {
+              // Expected if the operation takes time. Continue the loop, and wait for op
completion.
+            } catch (InterruptedException | CancellationException e) {
+              asyncEx = e;
+              cancelAll = true;
+              break;
+            } catch (ExecutionException e) {
+              if (e.getCause() == null) {
+                asyncEx = e;
+              } else {
+                asyncEx = e.getCause();
+              }
+              cancelAll = true;
+              break;
+            }
+          }
         }
+
       }
     }
-    if (asyncEx != null) {
-      throw new HiveException("Async initialization failed", asyncEx);
+
+    if (cancelAll || asyncEx != null) {
+      for (Future<?> f : fs) {
+        // It's ok to send a cancel to an already completed future. Is a no-op
+        f.cancel(true);
+      }
+      throw new HiveException("Async Initialization failed. abortRequested=" + abortOp.get(),
asyncEx);
     }
+
+
     completeInitializationOp(os);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/705ce7db/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index dc63d7b..f4a9cac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -85,7 +85,6 @@ public class MapRecordProcessor extends RecordProcessor {
   MRInputLegacy legacyMRInput;
   MultiMRInput mainWorkMultiMRInput;
   private final ExecMapperContext execContext;
-  private boolean abort;
   private MapWork mapWork;
   List<MapWork> mergeWorkList;
   List<String> cacheKeys;
@@ -360,18 +359,17 @@ public class MapRecordProcessor extends RecordProcessor {
   void run() throws Exception {
     while (sources[position].pushRecord()) {
       if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) {
-        if (abort && Thread.interrupted()) {
-          throw new HiveException("Processing thread interrupted");
-        }
+        checkAbortCondition();
         nRows = 0;
       }
     }
   }
 
+
   @Override
   public void abort() {
     // this will stop run() from pushing records
-    abort = true;
+    super.abort();
 
     // this will abort initializeOp()
     if (mapOp != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/705ce7db/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
index bb56e1c..6fad405 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
@@ -62,7 +62,6 @@ public class MergeFileRecordProcessor extends RecordProcessor {
   private String cacheKey;
   private MergeFileWork mfWork;
   MRInputLegacy mrInput = null;
-  private boolean abort = false;
   private final Object[] row = new Object[2];
   ObjectCache cache;
 
@@ -158,7 +157,7 @@ public class MergeFileRecordProcessor extends RecordProcessor {
 
   @Override
   void abort() {
-    abort = true;
+    super.abort();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/705ce7db/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
index 2f08529..a373ad6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
@@ -58,6 +58,7 @@ public abstract class RecordProcessor  {
 
   public static final Logger l4j = LoggerFactory.getLogger(RecordProcessor.class);
 
+  protected volatile boolean abort = false;
 
   // used to log memory usage periodically
   protected boolean isLogInfoEnabled = false;
@@ -108,8 +109,6 @@ public abstract class RecordProcessor  {
    */
   abstract void run() throws Exception;
 
-  abstract void abort();
-
   abstract void close();
 
   protected void createOutputMap() {
@@ -148,4 +147,16 @@ public abstract class RecordProcessor  {
       return null;
     }
   }
+
+  void abort() {
+    this.abort = true;
+  }
+
+  protected void checkAbortCondition() throws InterruptedException {
+    if (abort || Thread.currentThread().isInterrupted()) {
+      // Not cleaning the interrupt status.
+      boolean interruptState = Thread.currentThread().isInterrupted();
+      throw new InterruptedException("Processing thread aborted. Interrupt state: " + interruptState);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/705ce7db/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 0579dbc..415df92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -83,7 +83,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
 
   private byte bigTablePosition = 0;
 
-  private boolean abort;
+
   private int nRows = 0;
 
   public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context) throws
Exception {
@@ -262,9 +262,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
     // run the operator pipeline
     while (sources[bigTablePosition].pushRecord()) {
       if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) {
-        if (abort && Thread.interrupted()) {
-          throw new HiveException("Processing thread interrupted");
-        }
+        checkAbortCondition();
         nRows = 0;
       }
     }
@@ -273,7 +271,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
   @Override
   public void abort() {
     // this will stop run() from pushing records
-    abort = true;
+    super.abort();
 
     // this will abort initializeOp()
     if (reducer != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/705ce7db/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index c560f37..e7b7e43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -192,7 +192,11 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
       }
       if (originalThrowable != null) {
         LOG.error(StringUtils.stringifyException(originalThrowable));
-        throw new RuntimeException(originalThrowable);
+        if (originalThrowable instanceof InterruptedException) {
+          throw (InterruptedException) originalThrowable;
+        } else {
+          throw new RuntimeException(originalThrowable);
+        }
       }
     }
   }


Mime
View raw message