hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [09/46] hive git commit: HIVE-16078 : improve abort checking in Tez/LLAP (Sergey Shelukhin, reviewed by Rajesh Balamohan, Siddharth Seth)
Date Mon, 13 Mar 2017 19:42:10 GMT
HIVE-16078 : improve abort checking in Tez/LLAP (Sergey Shelukhin, reviewed by Rajesh Balamohan,
Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0cc1afa5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0cc1afa5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0cc1afa5

Branch: refs/heads/hive-14535
Commit: 0cc1afa56afc882c4b7a756112f749acebc3c58a
Parents: b478a22
Author: Sergey Shelukhin <sershe@apache.org>
Authored: Wed Mar 8 12:22:17 2017 -0800
Committer: Sergey Shelukhin <sershe@apache.org>
Committed: Wed Mar 8 12:22:17 2017 -0800

----------------------------------------------------------------------
 .../hive/ql/exec/CommonMergeJoinOperator.java   | 11 +++
 .../ql/exec/tez/InterruptibleProcessing.java    | 79 ++++++++++++++++++++
 .../hive/ql/exec/tez/MapRecordProcessor.java    | 18 ++---
 .../ql/exec/tez/MergeFileRecordProcessor.java   | 16 ++--
 .../hive/ql/exec/tez/RecordProcessor.java       | 18 +----
 .../hive/ql/exec/tez/ReduceRecordProcessor.java | 27 +++----
 .../VectorMapJoinGenerateResultOperator.java    | 29 +++++++
 7 files changed, 147 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0cc1afa5/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
index 0b8eae8..8495d73 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
+import org.apache.hadoop.hive.ql.exec.tez.InterruptibleProcessing;
 import org.apache.hadoop.hive.ql.exec.tez.RecordSource;
 import org.apache.hadoop.hive.ql.exec.tez.TezContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -86,6 +87,9 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
       new ArrayList<Operator<? extends OperatorDesc>>();
   transient Set<Integer> fetchInputAtClose;
 
+  // A field because we cannot multi-inherit.
+  transient InterruptibleProcessing interruptChecker;
+
   /** Kryo ctor. */
   protected CommonMergeJoinOperator() {
     super();
@@ -156,6 +160,7 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
     }
 
     sources = ((TezContext) MapredContext.get()).getRecordSources();
+    interruptChecker = new InterruptibleProcessing();
   }
 
   /*
@@ -374,11 +379,17 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
 
     // for tables other than the big table, we need to fetch more data until reach a new
group or
     // done.
+    interruptChecker.startAbortChecks(); // Reset the time, we only want to count it in the
loop.
     while (!foundNextKeyGroup[t]) {
       if (fetchDone[t]) {
         break;
       }
       fetchOneRow(t);
+      try {
+        interruptChecker.addRowAndMaybeCheckAbort();
+      } catch (InterruptedException e) {
+        throw new HiveException(e);
+      }
     }
     if (!foundNextKeyGroup[t] && fetchDone[t]) {
       this.nextKeyWritables[t] = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/0cc1afa5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InterruptibleProcessing.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InterruptibleProcessing.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InterruptibleProcessing.java
new file mode 100644
index 0000000..260886b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/InterruptibleProcessing.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+public class InterruptibleProcessing {
+  private final static Logger LOG = LoggerFactory.getLogger(InterruptibleProcessing.class);
+  private static final int CHECK_INTERRUPTION_AFTER_ROWS_DEFAULT = 1000,
+      CHECK_INTERRUPTION_AFTER_ROWS_MAX = 100000, CHECK_INTERRUPTION_AFTER_ROWS_MIN = 1;
+  private static final double TARGET_INTERRUPT_CHECK_TIME_NS = 3 * 1000000000.0;
+
+  private int checkInterruptionAfterRows = CHECK_INTERRUPTION_AFTER_ROWS_DEFAULT;
+  private long lastInterruptCheckNs = 0L;
+  private int nRows;
+  private volatile boolean isAborted;
+
+  // Methods should really be protected, but some places have to use this as a field.
+
+  public final void startAbortChecks() {
+    lastInterruptCheckNs = System.nanoTime();
+    nRows = 0;
+  }
+
+  public final void addRowAndMaybeCheckAbort() throws InterruptedException {
+    if (nRows++ < checkInterruptionAfterRows) return;
+    long time = System.nanoTime();
+    checkAbortCondition();
+    long elapsedNs = (time - lastInterruptCheckNs);
+    if (elapsedNs >= 0) {
+      // Make sure we don't get stuck at 0 time, however unlikely that is.
+      double diff = elapsedNs == 0 ? 10 : TARGET_INTERRUPT_CHECK_TIME_NS / elapsedNs;
+      int newRows = Math.min(CHECK_INTERRUPTION_AFTER_ROWS_MAX,
+          Math.max(CHECK_INTERRUPTION_AFTER_ROWS_MIN, (int) (diff * checkInterruptionAfterRows)));
+      if (checkInterruptionAfterRows != newRows && LOG.isDebugEnabled()) {
+        LOG.debug("Adjusting abort check rows to " + newRows
+            + " from " + checkInterruptionAfterRows);
+      }
+      checkInterruptionAfterRows = newRows;
+    }
+    nRows = 0;
+    lastInterruptCheckNs = time;
+  }
+
+  public final void checkAbortCondition() throws InterruptedException {
+    boolean isInterrupted = Thread.currentThread().isInterrupted();
+    if (!isAborted && !isInterrupted) return;
+    // Not cleaning the interrupt status.
+    throw new InterruptedException("Processing thread aborted. Interrupt state: " + isInterrupted);
+  }
+
+  public final void setAborted(boolean b) {
+    this.isAborted = b;
+  }
+
+  public void abort() {
+    setAborted(true);
+  }
+
+  public final boolean isAborted() {
+    return this.isAborted;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/0cc1afa5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 790c9d8..24d3526 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -92,7 +92,6 @@ public class MapRecordProcessor extends RecordProcessor {
   List<MapWork> mergeWorkList;
   List<String> cacheKeys, dynamicValueCacheKeys;
   ObjectCache cache, dynamicValueCache;
-  private int nRows;
 
   public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception
{
     super(jconf, context);
@@ -106,7 +105,6 @@ public class MapRecordProcessor extends RecordProcessor {
     execContext.setJc(jconf);
     cacheKeys = new ArrayList<String>();
     dynamicValueCacheKeys = new ArrayList<String>();
-    nRows = 0;
   }
 
   private void setLlapOfFragmentId(final ProcessorContext context) {
@@ -343,7 +341,7 @@ public class MapRecordProcessor extends RecordProcessor {
       MapredContext.get().setReporter(reporter);
 
     } catch (Throwable e) {
-      abort = true;
+      setAborted(true);
       if (e instanceof OutOfMemoryError) {
         // will this be true here?
         // Don't create a new object if we are already out of memory
@@ -417,15 +415,12 @@ public class MapRecordProcessor extends RecordProcessor {
 
   @Override
   void run() throws Exception {
+    startAbortChecks();
     while (sources[position].pushRecord()) {
-      if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) {
-        checkAbortCondition();
-        nRows = 0;
-      }
+      addRowAndMaybeCheckAbort();
     }
   }
 
-
   @Override
   public void abort() {
     // this will stop run() from pushing records, along with potentially
@@ -444,8 +439,8 @@ public class MapRecordProcessor extends RecordProcessor {
   @Override
   void close(){
     // check if there are IOExceptions
-    if (!abort) {
-      abort = execContext.getIoCxt().getIOExceptions();
+    if (!isAborted()) {
+      setAborted(execContext.getIoCxt().getIOExceptions());
     }
 
     if (cache != null && cacheKeys != null) {
@@ -465,6 +460,7 @@ public class MapRecordProcessor extends RecordProcessor {
       if (mapOp == null || mapWork == null) {
         return;
       }
+      boolean abort = isAborted();
       mapOp.close(abort);
       if (mergeMapOpList.isEmpty() == false) {
         for (AbstractMapOperator mergeMapOp : mergeMapOpList) {
@@ -486,7 +482,7 @@ public class MapRecordProcessor extends RecordProcessor {
       mapOp.preorderMap(rps);
       return;
     } catch (Exception e) {
-      if (!abort) {
+      if (!isAborted()) {
         // signal new failure to map-reduce
         l4j.error("Hit error while closing operators - failing tree");
         throw new RuntimeException("Hive Runtime Error while closing operators", e);

http://git-wip-us.apache.org/repos/asf/hive/blob/0cc1afa5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
index b7f1011..69de3a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
@@ -153,18 +153,13 @@ public class MergeFileRecordProcessor extends RecordProcessor {
     while (reader.next()) {
       boolean needMore = processRow(reader.getCurrentKey(),
           reader.getCurrentValue());
-      if (!needMore || abort) {
+      if (!needMore || isAborted()) {
         break;
       }
     }
   }
 
   @Override
-  void abort() {
-    super.abort();
-  }
-
-  @Override
   void close() {
 
     if (cache != null && cacheKey != null) {
@@ -172,8 +167,8 @@ public class MergeFileRecordProcessor extends RecordProcessor {
     }
 
     // check if there are IOExceptions
-    if (!abort) {
-      abort = execContext.getIoCxt().getIOExceptions();
+    if (!isAborted()) {
+      setAborted(execContext.getIoCxt().getIOExceptions());
     }
 
     // detecting failed executions by exceptions thrown by the operator tree
@@ -181,12 +176,13 @@ public class MergeFileRecordProcessor extends RecordProcessor {
       if (mergeOp == null || mfWork == null) {
         return;
       }
+      boolean abort = isAborted();
       mergeOp.close(abort);
 
       ExecMapper.ReportStats rps = new ExecMapper.ReportStats(reporter, jconf);
       mergeOp.preorderMap(rps);
     } catch (Exception e) {
-      if (!abort) {
+      if (!isAborted()) {
         // signal new failure to map-reduce
         l4j.error("Hit error while closing operators - failing tree");
         throw new RuntimeException("Hive Runtime Error while closing operators",
@@ -216,7 +212,7 @@ public class MergeFileRecordProcessor extends RecordProcessor {
         mergeOp.process(row, 0);
       }
     } catch (Throwable e) {
-      abort = true;
+      setAborted(true);
       if (e instanceof OutOfMemoryError) {
         // Don't create a new object if we are already out of memory
         throw (OutOfMemoryError) e;

http://git-wip-us.apache.org/repos/asf/hive/blob/0cc1afa5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
index 77c7fa3..106a534 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
@@ -47,9 +47,7 @@ import com.google.common.collect.Maps;
  * Process input from tez LogicalInput and write output
  * It has different subclasses for map and reduce processing
  */
-public abstract class RecordProcessor  {
-  protected static final int CHECK_INTERRUPTION_AFTER_ROWS = 1000;
-
+public abstract class RecordProcessor extends InterruptibleProcessing {
   protected final JobConf jconf;
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
@@ -58,8 +56,6 @@ public abstract class RecordProcessor  {
 
   public static final Logger l4j = LoggerFactory.getLogger(RecordProcessor.class);
 
-  protected volatile boolean abort = false;
-
   // used to log memory usage periodically
   protected boolean isLogInfoEnabled = false;
   protected boolean isLogTraceEnabled = false;
@@ -149,16 +145,4 @@ public abstract class RecordProcessor  {
       return null;
     }
   }
-
-  void abort() {
-    this.abort = true;
-  }
-
-  protected void checkAbortCondition() throws InterruptedException {
-    if (abort || Thread.currentThread().isInterrupted()) {
-      // Not cleaning the interrupt status.
-      boolean interruptState = Thread.currentThread().isInterrupted();
-      throw new InterruptedException("Processing thread aborted. Interrupt state: " + interruptState);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0cc1afa5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 3fb9fb1..f6f2dd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -62,7 +62,7 @@ import com.google.common.collect.Lists;
  * Process input from tez LogicalInput and write output - for a map plan
  * Just pump the records through the query plan.
  */
-public class ReduceRecordProcessor  extends RecordProcessor{
+public class ReduceRecordProcessor extends RecordProcessor {
 
   private static final String REDUCE_PLAN_KEY = "__REDUCE_PLAN__";
 
@@ -85,9 +85,6 @@ public class ReduceRecordProcessor  extends RecordProcessor{
 
   private byte bigTablePosition = 0;
 
-
-  private int nRows = 0;
-
   public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context) throws
Exception {
     super(jconf, context);
 
@@ -256,7 +253,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
       MapredContext.get().setReporter(reporter);
 
     } catch (Throwable e) {
-      abort = true;
+      super.setAborted(true);
       if (e instanceof OutOfMemoryError) {
         // Don't create a new object if we are already out of memory
         throw (OutOfMemoryError) e;
@@ -309,18 +306,16 @@ public class ReduceRecordProcessor  extends RecordProcessor{
 
     for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
       l4j.info("Starting Output: " + outputEntry.getKey());
-      if (!abort) {
+      if (!isAborted()) {
         outputEntry.getValue().start();
         ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
       }
     }
 
     // run the operator pipeline
+    startAbortChecks();
     while (sources[bigTablePosition].pushRecord()) {
-      if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) {
-        checkAbortCondition();
-        nRows = 0;
-      }
+      addRowAndMaybeCheckAbort();
     }
   }
 
@@ -374,10 +369,16 @@ public class ReduceRecordProcessor  extends RecordProcessor{
     }
 
     try {
-      for (ReduceRecordSource rs: sources) {
-        abort = abort && rs.close();
+      if (isAborted()) {
+        for (ReduceRecordSource rs: sources) {
+          if (!rs.close()) {
+            setAborted(false); // Preserving the old logic. Hmm...
+            break;
+          }
+        }
       }
 
+      boolean abort = isAborted();
       reducer.close(abort);
       if (mergeWorkList != null) {
         for (BaseWork redWork : mergeWorkList) {
@@ -398,7 +399,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
       reducer.preorderMap(rps);
 
     } catch (Exception e) {
-      if (!abort) {
+      if (!isAborted()) {
         // signal new failure to map-reduce
         l4j.error("Hit error while closing operators - failing tree");
         throw new RuntimeException(

http://git-wip-us.apache.org/repos/asf/hive/blob/0cc1afa5/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index 3e8d3e8..cb30413 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -25,7 +25,9 @@ import java.util.List;
 import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
@@ -76,6 +78,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinGenerateResultOperator.class.getName());
   private static final String CLASS_NAME = VectorMapJoinGenerateResultOperator.class.getName();
+  private static final int CHECK_INTERRUPT_PER_OVERFLOW_BATCHES = 10;
 
   //------------------------------------------------------------------------------------------------
 
@@ -85,6 +88,9 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
 
   private transient VectorDeserializeRow bigTableVectorDeserializeRow;
 
+  private transient Thread ownThread;
+  private transient int interruptCheckCounter = CHECK_INTERRUPT_PER_OVERFLOW_BATCHES;
+
   // Debug display.
   protected transient long batchCounter;
 
@@ -102,6 +108,20 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
     super(ctx, vContext, conf);
   }
 
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
+    setUpInterruptChecking();
+  }
+
+  private void setUpInterruptChecking() {
+    for (Operator<? extends OperatorDesc> child : childOperatorsArray) {
+      // We will only do interrupt checking in the lowest-level operator for multiple joins.
+      if (child instanceof VectorMapJoinGenerateResultOperator) return;
+    }
+    ownThread = Thread.currentThread();
+  }
+
   protected void commonSetup(VectorizedRowBatch batch) throws HiveException {
     super.commonSetup(batch);
 
@@ -627,6 +647,15 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
   protected void forwardOverflow() throws HiveException {
     forward(overflowBatch, null);
     overflowBatch.reset();
+    maybeCheckInterrupt();
+  }
+
+  private void maybeCheckInterrupt() throws HiveException {
+    if (ownThread == null || --interruptCheckCounter > 0) return;
+    if (ownThread.isInterrupted()) {
+      throw new HiveException("Thread interrupted");
+    }
+    interruptCheckCounter = CHECK_INTERRUPT_PER_OVERFLOW_BATCHES;
   }
 
   /**


Mime
View raw message