hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject hive git commit: HIVE-13858. LLAP: A preempted task can end up waiting on completeInitialization if some part of the executing code suppressed the interrupt. (Siddharth Seth, reviewed by Prasanth Jayachandran)
Date Wed, 01 Jun 2016 18:09:09 GMT
Repository: hive
Updated Branches:
  refs/heads/master cd464e9fd -> ea4897571


HIVE-13858. LLAP: A preempted task can end up waiting on completeInitialization if some part
of the executing code suppressed the interrupt. (Siddharth Seth, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ea489757
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ea489757
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ea489757

Branch: refs/heads/master
Commit: ea489757103bf6bc6881b0e8c68c67dc79801d70
Parents: cd464e9
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Jun 1 11:08:25 2016 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Jun 1 11:08:25 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/Operator.java    | 67 ++++++++++++++++----
 .../hive/ql/exec/tez/MapRecordProcessor.java    |  8 +--
 .../ql/exec/tez/MergeFileRecordProcessor.java   |  3 +-
 .../hive/ql/exec/tez/RecordProcessor.java       | 15 ++++-
 .../hive/ql/exec/tez/ReduceRecordProcessor.java |  8 +--
 .../hadoop/hive/ql/exec/tez/TezProcessor.java   |  6 +-
 6 files changed, 79 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ea489757/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 636f079..be141c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -28,7 +28,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
@@ -71,7 +74,7 @@ public abstract class Operator<T extends OperatorDesc> implements
Serializable,C
   protected List<Operator<? extends OperatorDesc>> childOperators;
   protected List<Operator<? extends OperatorDesc>> parentOperators;
   protected String operatorId;
-  protected AtomicBoolean abortOp;
+  protected final AtomicBoolean abortOp;
   private transient ExecMapperContext execContext;
   private transient boolean rootInitializeCalled = false;
   protected final transient Collection<Future<?>> asyncInitOperations = new HashSet<>();
@@ -390,24 +393,62 @@ public abstract class Operator<T extends OperatorDesc> implements
Serializable,C
     Object[] os = new Object[fs.size()];
     int i = 0;
     Throwable asyncEx = null;
+
+    // Wait for all futures to complete. Check for an abort while waiting for each future.
If any of the futures is cancelled / aborted - cancel all subsequent futures.
+
+    boolean cancelAll = false;
     for (Future<?> f : fs) {
-      if (abortOp.get() || asyncEx != null) {
-        // We were aborted, interrupted or one of the operations failed; terminate all.
-        f.cancel(true);
+      // If aborted - break out of the loop, and cancel all subsequent futures.
+      if (cancelAll) {
+        break;
+      }
+      if (abortOp.get()) {
+        cancelAll = true;
+        break;
       } else {
-        try {
-          os[i++] = f.get();
-        } catch (CancellationException ex) {
-          asyncEx = new InterruptedException("Future was canceled");
-        } catch (Throwable t) {
-          f.cancel(true);
-          asyncEx = t;
+        // Wait for the current future.
+        while (true) {
+          if (abortOp.get()) {
+            cancelAll = true;
+            break;
+          } else {
+            try {
+              // Await future result with a timeout to check the abort field occasionally.
+              // It's possible that the interrupt which comes in along with an abort, is
suppressed
+              // by some other operator.
+              Object futureResult = f.get(200l, TimeUnit.MILLISECONDS);
+              os[i++] = futureResult;
+              break;
+            } catch (TimeoutException e) {
+              // Expected if the operation takes time. Continue the loop, and wait for op
completion.
+            } catch (InterruptedException | CancellationException e) {
+              asyncEx = e;
+              cancelAll = true;
+              break;
+            } catch (ExecutionException e) {
+              if (e.getCause() == null) {
+                asyncEx = e;
+              } else {
+                asyncEx = e.getCause();
+              }
+              cancelAll = true;
+              break;
+            }
+          }
         }
+
       }
     }
-    if (asyncEx != null) {
-      throw new HiveException("Async initialization failed", asyncEx);
+
+    if (cancelAll || asyncEx != null) {
+      for (Future<?> f : fs) {
+        // It's ok to send a cancel to an already completed future. Is a no-op
+        f.cancel(true);
+      }
+      throw new HiveException("Async Initialization failed. abortRequested=" + abortOp.get(),
asyncEx);
     }
+
+
     completeInitializationOp(os);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ea489757/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index dc63d7b..f4a9cac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -85,7 +85,6 @@ public class MapRecordProcessor extends RecordProcessor {
   MRInputLegacy legacyMRInput;
   MultiMRInput mainWorkMultiMRInput;
   private final ExecMapperContext execContext;
-  private boolean abort;
   private MapWork mapWork;
   List<MapWork> mergeWorkList;
   List<String> cacheKeys;
@@ -360,18 +359,17 @@ public class MapRecordProcessor extends RecordProcessor {
   void run() throws Exception {
     while (sources[position].pushRecord()) {
       if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) {
-        if (abort && Thread.interrupted()) {
-          throw new HiveException("Processing thread interrupted");
-        }
+        checkAbortCondition();
         nRows = 0;
       }
     }
   }
 
+
   @Override
   public void abort() {
     // this will stop run() from pushing records
-    abort = true;
+    super.abort();
 
     // this will abort initializeOp()
     if (mapOp != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ea489757/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
index bb56e1c..6fad405 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
@@ -62,7 +62,6 @@ public class MergeFileRecordProcessor extends RecordProcessor {
   private String cacheKey;
   private MergeFileWork mfWork;
   MRInputLegacy mrInput = null;
-  private boolean abort = false;
   private final Object[] row = new Object[2];
   ObjectCache cache;
 
@@ -158,7 +157,7 @@ public class MergeFileRecordProcessor extends RecordProcessor {
 
   @Override
   void abort() {
-    abort = true;
+    super.abort();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/ea489757/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
index 2f08529..a373ad6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
@@ -58,6 +58,7 @@ public abstract class RecordProcessor  {
 
   public static final Logger l4j = LoggerFactory.getLogger(RecordProcessor.class);
 
+  protected volatile boolean abort = false;
 
   // used to log memory usage periodically
   protected boolean isLogInfoEnabled = false;
@@ -108,8 +109,6 @@ public abstract class RecordProcessor  {
    */
   abstract void run() throws Exception;
 
-  abstract void abort();
-
   abstract void close();
 
   protected void createOutputMap() {
@@ -148,4 +147,16 @@ public abstract class RecordProcessor  {
       return null;
     }
   }
+
+  void abort() {
+    this.abort = true;
+  }
+
+  protected void checkAbortCondition() throws InterruptedException {
+    if (abort || Thread.currentThread().isInterrupted()) {
+      // Not cleaning the interrupt status.
+      boolean interruptState = Thread.currentThread().isInterrupted();
+      throw new InterruptedException("Processing thread aborted. Interrupt state: " + interruptState);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ea489757/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 0579dbc..415df92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -83,7 +83,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
 
   private byte bigTablePosition = 0;
 
-  private boolean abort;
+
   private int nRows = 0;
 
   public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context) throws
Exception {
@@ -262,9 +262,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
     // run the operator pipeline
     while (sources[bigTablePosition].pushRecord()) {
       if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) {
-        if (abort && Thread.interrupted()) {
-          throw new HiveException("Processing thread interrupted");
-        }
+        checkAbortCondition();
         nRows = 0;
       }
     }
@@ -273,7 +271,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
   @Override
   public void abort() {
     // this will stop run() from pushing records
-    abort = true;
+    super.abort();
 
     // this will abort initializeOp()
     if (reducer != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ea489757/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index c560f37..e7b7e43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -192,7 +192,11 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
       }
       if (originalThrowable != null) {
         LOG.error(StringUtils.stringifyException(originalThrowable));
-        throw new RuntimeException(originalThrowable);
+        if (originalThrowable instanceof InterruptedException) {
+          throw (InterruptedException) originalThrowable;
+        } else {
+          throw new RuntimeException(originalThrowable);
+        }
       }
     }
   }


Mime
View raw message