hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [08/51] [partial] hive git commit: Revert "HIVE-14671 : merge master into hive-14535 (Wei Zheng)"
Date Mon, 08 May 2017 20:42:58 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
index afe1484..af1fa66 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
-import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkCommonOperator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
 import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
@@ -144,17 +143,13 @@ public final class OperatorFactory {
 
   public static <T extends OperatorDesc> Operator<T> getVectorOperator(
     Class<? extends Operator<?>> opClass, CompilationOpContext cContext, T conf,
-        VectorizationContext vContext, Operator<? extends OperatorDesc> originalOp) throws HiveException {
+        VectorizationContext vContext) throws HiveException {
     try {
       VectorDesc vectorDesc = ((AbstractOperatorDesc) conf).getVectorDesc();
       vectorDesc.setVectorOp(opClass);
-      Operator<T> op = (Operator<T>) opClass.getDeclaredConstructor(CompilationOpContext.class,
-          VectorizationContext.class, OperatorDesc.class).newInstance(cContext, vContext, conf);
-      op.setOperatorId(originalOp.getOperatorId());
-      if (op instanceof VectorReduceSinkOperator || op instanceof VectorReduceSinkCommonOperator) {
-        ((ReduceSinkDesc) op.getConf()).setOutputOperators(((ReduceSinkDesc) originalOp.getConf())
-            .getOutputOperators());
-      }
+      Operator<T> op = (Operator<T>) opClass.getDeclaredConstructor(
+          CompilationOpContext.class, VectorizationContext.class, OperatorDesc.class)
+          .newInstance(cContext, vContext, conf);
       return op;
     } catch (Exception e) {
       e.printStackTrace();
@@ -163,12 +158,11 @@ public final class OperatorFactory {
   }
 
   public static <T extends OperatorDesc> Operator<T> getVectorOperator(
-      CompilationOpContext cContext, T conf, VectorizationContext vContext,
-      Operator<? extends OperatorDesc> originalOp) throws HiveException {
+      CompilationOpContext cContext, T conf, VectorizationContext vContext) throws HiveException {
     Class<T> descClass = (Class<T>) conf.getClass();
-    Class<? extends Operator<? extends OperatorDesc>> opClass = vectorOpvec.get(descClass);
+    Class<?> opClass = vectorOpvec.get(descClass);
     if (opClass != null) {
-      return getVectorOperator(opClass, cContext, conf, vContext, originalOp);
+      return getVectorOperator(vectorOpvec.get(descClass), cContext, conf, vContext);
     }
     throw new HiveException("No vector operator for descriptor class " + descClass.getName());
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 5412ef1..3b10bfd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -107,7 +107,8 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
       srcFiles.addAll(Arrays.asList(srcs));
       LOG.debug("ReplCopyTask numFiles:" + (srcFiles == null ? "null" : srcFiles.size()));
 
-      if (!FileUtils.mkdir(dstFs, toPath, conf)) {
+      boolean inheritPerms = conf.getBoolVar(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+      if (!FileUtils.mkdir(dstFs, toPath, inheritPerms, conf)) {
         console.printError("Cannot make target directory: " + toPath.toString());
         return 2;
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
index 01a652d..247d589 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -33,12 +33,10 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.CopyOnFirstWriteProperties;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
@@ -225,7 +223,6 @@ public class SerializationUtilities {
       kryo.register(java.sql.Timestamp.class, new TimestampSerializer());
       kryo.register(Path.class, new PathSerializer());
       kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
-      kryo.register(CopyOnFirstWriteProperties.class, new CopyOnFirstWritePropertiesSerializer());
 
       ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
           .setFallbackInstantiatorStrategy(
@@ -425,33 +422,6 @@ public class SerializationUtilities {
   }
 
   /**
-   * CopyOnFirstWriteProperties needs a special serializer, since it extends Properties,
-   * which implements Map, so MapSerializer would be used for it by default. Yet it has
-   * the additional 'interned' field that the standard MapSerializer doesn't handle
-   * properly. But FieldSerializer doesn't work for it as well, because the Hashtable
-   * superclass declares most of its fields transient.
-   */
-  private static class CopyOnFirstWritePropertiesSerializer extends
-      com.esotericsoftware.kryo.serializers.MapSerializer {
-
-    @Override
-    public void write(Kryo kryo, Output output, Map map) {
-      super.write(kryo, output, map);
-      CopyOnFirstWriteProperties p = (CopyOnFirstWriteProperties) map;
-      Properties ip = p.getInterned();
-      kryo.writeObjectOrNull(output, ip, Properties.class);
-    }
-
-    @Override
-    public Map read(Kryo kryo, Input input, Class<Map> type) {
-      Map map = super.read(kryo, input, type);
-      Properties ip = kryo.readObjectOrNull(input, Properties.class);
-      ((CopyOnFirstWriteProperties) map).setInterned(ip);
-      return map;
-    }
-  }
-
-  /**
    * Serializes the plan.
    *
    * @param plan The plan, such as QueryPlan, MapredWork, etc.

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
index 65363ed..65227e9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
@@ -347,15 +347,14 @@ public class StatsNoJobTask extends Task<StatsNoJobWork> implements Serializable
     try {
 
       // Wait a while for existing tasks to terminate
-      while (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
-        LOG.debug("Waiting for all stats tasks to finish...");
-      }
-      // Cancel currently executing tasks
-      threadPool.shutdownNow();
-
-      // Wait a while for tasks to respond to being cancelled
       if (!threadPool.awaitTermination(100, TimeUnit.SECONDS)) {
-        LOG.debug("Stats collection thread pool did not terminate");
+        // Cancel currently executing tasks
+        threadPool.shutdownNow();
+
+        // Wait a while for tasks to respond to being cancelled
+        if (!threadPool.awaitTermination(100, TimeUnit.SECONDS)) {
+          LOG.debug("Stats collection thread pool did not terminate");
+        }
       }
     } catch (InterruptedException ie) {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
index eddc31e..a596e92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.session.OperationLog;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +35,7 @@ public class TaskRunner extends Thread {
   protected Task<? extends Serializable> tsk;
   protected TaskResult result;
   protected SessionState ss;
+  private OperationLog operationLog;
   private static AtomicLong taskCounter = new AtomicLong(0);
   private static ThreadLocal<Long> taskRunnerID = new ThreadLocal<Long>() {
     @Override
@@ -72,6 +74,7 @@ public class TaskRunner extends Thread {
   public void run() {
     runner = Thread.currentThread();
     try {
+      OperationLog.setCurrentOperationLog(operationLog);
       SessionState.start(ss);
       runSequential();
     } finally {
@@ -110,4 +113,8 @@ public class TaskRunner extends Thread {
   public static long getTaskRunnerID () {
     return taskRunnerID.get();
   }
+
+  public void setOperationLog(OperationLog operationLog) {
+    this.operationLog = operationLog;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
index 48ae02f..f3c7c77 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
@@ -27,7 +27,6 @@ import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.LlapDaemonInfo;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -106,8 +105,8 @@ public class TopNHash {
     }
 
     final boolean isTez = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez");
-    final boolean isLlap = LlapDaemonInfo.INSTANCE.isLlap();
-    final int numExecutors = isLlap ? LlapDaemonInfo.INSTANCE.getNumExecutors() : 1;
+    final boolean isLlap = isTez && HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap");
+    final int numExecutors = isLlap ? HiveConf.getIntVar(hconf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS) : 1;
 
     // Used Memory = totalMemory() - freeMemory();
     // Total Free Memory = maxMemory() - Used Memory;

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 777c119..5b5ddc3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -76,7 +76,6 @@ import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.WordUtils;
@@ -110,8 +109,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.Driver.DriverState;
-import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
@@ -206,6 +203,7 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Shell;
 import org.apache.hive.common.util.ACLConfigurationParser;
 import org.apache.hive.common.util.ReflectionUtil;
 import org.slf4j.Logger;
@@ -282,11 +280,11 @@ public final class Utilities {
    * The object in the reducer are composed of these top level fields.
    */
 
-  public static final String HADOOP_LOCAL_FS = "file:///";
+  public static String HADOOP_LOCAL_FS = "file:///";
   public static final String HADOOP_LOCAL_FS_SCHEME = "file";
-  public static final String MAP_PLAN_NAME = "map.xml";
-  public static final String REDUCE_PLAN_NAME = "reduce.xml";
-  public static final String MERGE_PLAN_NAME = "merge.xml";
+  public static String MAP_PLAN_NAME = "map.xml";
+  public static String REDUCE_PLAN_NAME = "reduce.xml";
+  public static String MERGE_PLAN_NAME = "merge.xml";
   public static final String INPUT_NAME = "iocontext.input.name";
   public static final String HAS_MAP_WORK = "has.map.work";
   public static final String HAS_REDUCE_WORK = "has.reduce.work";
@@ -295,11 +293,11 @@ public final class Utilities {
   public static final String HIVE_ADDED_JARS = "hive.added.jars";
   public static final String VECTOR_MODE = "VECTOR_MODE";
   public static final String USE_VECTORIZED_INPUT_FILE_FORMAT = "USE_VECTORIZED_INPUT_FILE_FORMAT";
-  public static final String MAPNAME = "Map ";
-  public static final String REDUCENAME = "Reducer ";
+  public static String MAPNAME = "Map ";
+  public static String REDUCENAME = "Reducer ";
 
   @Deprecated
-  protected static final String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = "mapred.dfsclient.parallelism.max";
+  protected static String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = "mapred.dfsclient.parallelism.max";
 
   /**
    * ReduceField:
@@ -605,7 +603,7 @@ public final class Utilities {
   public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) {
     String useName = conf.get(INPUT_NAME);
     if (useName == null) {
-      useName = "mapreduce:" + hiveScratchDir;
+      useName = "mapreduce";
     }
     conf.set(INPUT_NAME, useName);
     setMapWork(conf, w.getMapWork(), hiveScratchDir, true);
@@ -769,8 +767,8 @@ public final class Utilities {
   // Note: When DDL supports specifying what string to represent null,
   // we should specify "NULL" to represent null in the temp table, and then
   // we can make the following translation deprecated.
-  public static final String nullStringStorage = "\\N";
-  public static final String nullStringOutput = "NULL";
+  public static String nullStringStorage = "\\N";
+  public static String nullStringOutput = "NULL";
 
   public static Random randGen = new Random();
 
@@ -2683,7 +2681,7 @@ public final class Utilities {
     setColumnTypeList(jobConf, rowSchema, excludeVCs);
   }
 
-  public static final String suffix = ".hashtable";
+  public static String suffix = ".hashtable";
 
   public static Path generatePath(Path basePath, String dumpFilePrefix,
       Byte tag, String bigBucketFileName) {
@@ -3164,7 +3162,6 @@ public final class Utilities {
 
     Set<Path> pathsProcessed = new HashSet<Path>();
     List<Path> pathsToAdd = new LinkedList<Path>();
-    LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState();
     // AliasToWork contains all the aliases
     Collection<String> aliasToWork = work.getAliasToWork().keySet();
     if (!skipDummy) {
@@ -3185,9 +3182,6 @@ public final class Utilities {
       boolean hasLogged = false;
       Path path = null;
       for (Map.Entry<Path, ArrayList<String>> e : pathToAliases) {
-        if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
-          throw new IOException("Operation is Canceled. ");
-
         Path file = e.getKey();
         List<String> aliases = e.getValue();
         if (aliases.contains(alias)) {
@@ -3241,8 +3235,6 @@ public final class Utilities {
     List<Path> finalPathsToAdd = new LinkedList<>();
     List<Future<Path>> futures = new LinkedList<>();
     for (final Path path : pathsToAdd) {
-      if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
-        throw new IOException("Operation is Canceled. ");
       if (pool == null) {
         finalPathsToAdd.add(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call());
       } else {
@@ -3252,8 +3244,6 @@ public final class Utilities {
 
     if (pool != null) {
       for (Future<Path> future : futures) {
-        if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
-          throw new IOException("Operation is Canceled. ");
         finalPathsToAdd.add(future.get());
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java
deleted file mode 100644
index 4ad4f98..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.mapjoin;
-
-/**
- * When this Error is thrown, better not retry.
- */
-public class MapJoinMemoryExhaustionError extends Error {
-  private static final long serialVersionUID = 3678353959830506881L;
-  public MapJoinMemoryExhaustionError(String msg) {
-    super(msg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
new file mode 100644
index 0000000..dbe00b6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.mapjoin;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+
+
+public class MapJoinMemoryExhaustionException extends HiveException {
+  private static final long serialVersionUID = 3678353959830506881L;
+  public MapJoinMemoryExhaustionException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
index d5e81e1..7fc3226 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
@@ -86,17 +86,17 @@ public class MapJoinMemoryExhaustionHandler {
    *
    * @param tableContainerSize currently table container size
    * @param numRows number of rows processed
-   * @throws MapJoinMemoryExhaustionError
+   * @throws MapJoinMemoryExhaustionException
    */
   public void checkMemoryStatus(long tableContainerSize, long numRows)
-  throws MapJoinMemoryExhaustionError {
+  throws MapJoinMemoryExhaustionException {
     long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
     double percentage = (double) usedMemory / (double) maxHeapSize;
     String msg = Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t"
         + tableContainerSize + "\tMemory usage:\t" + usedMemory + "\tpercentage:\t" + percentageNumberFormat.format(percentage);
     console.printInfo(msg);
     if(percentage > maxMemoryUsage) {
-      throw new MapJoinMemoryExhaustionError(msg);
+      throw new MapJoinMemoryExhaustionException(msg);
     }
    }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 93a36c6..1945163 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -32,7 +32,6 @@ import java.util.Properties;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-import org.apache.hadoop.hive.ql.log.LogDivertAppenderForTest;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,7 +68,6 @@ import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
 import org.apache.hadoop.hive.ql.io.IOPrepareCache;
-import org.apache.hadoop.hive.ql.log.LogDivertAppender;
 import org.apache.hadoop.hive.ql.log.NullAppender;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
@@ -118,8 +116,6 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
   protected transient JobConf job;
   public static MemoryMXBean memoryMXBean;
   protected HadoopJobExecHelper jobExecHelper;
-  private transient boolean isShutdown = false;
-  private transient boolean jobKilled = false;
 
   protected static transient final Logger LOG = LoggerFactory.getLogger(ExecDriver.class);
 
@@ -416,7 +412,10 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
 
       if (driverContext.isShutdown()) {
         LOG.warn("Task was cancelled");
-        killJob();
+        if (rj != null) {
+          rj.killJob();
+          rj = null;
+        }
         return 5;
       }
 
@@ -449,7 +448,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
 
         if (rj != null) {
           if (returnVal != 0) {
-            killJob();
+            rj.killJob();
           }
           jobID = rj.getID().toString();
         }
@@ -633,8 +632,6 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
   private static void setupChildLog4j(Configuration conf) {
     try {
       LogUtils.initHiveExecLog4j();
-      LogDivertAppender.registerRoutingAppender(conf);
-      LogDivertAppenderForTest.registerRoutingAppenderIfInTest(conf);
     } catch (LogInitializationException e) {
       System.err.println(e.getMessage());
     }
@@ -706,8 +703,6 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
     }
     System.setProperty(HiveConf.ConfVars.HIVEQUERYID.toString(), queryId);
 
-    LogUtils.registerLoggingContext(conf);
-
     if (noLog) {
       // If started from main(), and noLog is on, we should not output
       // any logs. To turn the log on, please set -Dtest.silent=false
@@ -858,37 +853,22 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
     ss.getHiveHistory().logPlanProgress(queryPlan);
   }
 
-  public boolean isTaskShutdown() {
-    return isShutdown;
-  }
-
   @Override
   public void shutdown() {
     super.shutdown();
-    killJob();
-    isShutdown = true;
-  }
-
-  @Override
-  public String getExternalHandle() {
-    return this.jobID;
-  }
-
-  private void killJob() {
-    boolean needToKillJob = false;
-    synchronized(this) {
-      if (rj != null && !jobKilled) {
-        jobKilled = true;
-        needToKillJob = true;
-      }
-    }
-    if (needToKillJob) {
+    if (rj != null) {
       try {
         rj.killJob();
       } catch (Exception e) {
         LOG.warn("failed to kill job " + rj.getID(), e);
       }
+      rj = null;
     }
   }
+
+  @Override
+  public String getExternalHandle() {
+    return this.jobID;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
index c5d4f9a..591ea97 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -69,6 +69,7 @@ import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.session.OperationLog;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -326,8 +327,18 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab
 
       CachingPrintStream errPrintStream = new CachingPrintStream(System.err);
 
-      StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
-      StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream);
+      StreamPrinter outPrinter;
+      StreamPrinter errPrinter;
+      OperationLog operationLog = OperationLog.getCurrentOperationLog();
+      if (operationLog != null) {
+        outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out,
+            operationLog.getPrintStream());
+        errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream,
+            operationLog.getPrintStream());
+      } else {
+        outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
+        errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream);
+      }
 
       outPrinter.start();
       errPrinter.start();
@@ -384,7 +395,7 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab
           + Utilities.showTime(elapsed) + " sec.");
     } catch (Throwable throwable) {
       if (throwable instanceof OutOfMemoryError
-          || (throwable instanceof MapJoinMemoryExhaustionError)) {
+          || (throwable instanceof MapJoinMemoryExhaustionException)) {
         l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable);
         return 3;
       } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
index 360b639..04e24bd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
@@ -24,8 +24,6 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hive.common.MemoryEstimate;
-import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -48,7 +46,7 @@ import com.google.common.annotations.VisibleForTesting;
  * Initially inspired by HPPC LongLongOpenHashMap; however, the code is almost completely reworked
  * and there's very little in common left save for quadratic probing (and that with some changes).
  */
-public final class BytesBytesMultiHashMap implements MemoryEstimate {
+public final class BytesBytesMultiHashMap {
   public static final Logger LOG = LoggerFactory.getLogger(BytesBytesMultiHashMap.class);
 
   /*
@@ -523,18 +521,7 @@ public final class BytesBytesMultiHashMap implements MemoryEstimate {
    * @return number of bytes
    */
   public long memorySize() {
-    return getEstimatedMemorySize();
-  }
-
-  @Override
-  public long getEstimatedMemorySize() {
-    JavaDataModel jdm = JavaDataModel.get();
-    long size = 0;
-    size += writeBuffers.getEstimatedMemorySize();
-    size += jdm.lengthForLongArrayOfSize(refs.length);
-    // 11 primitive1 fields, 2 refs above with alignment
-    size += JavaDataModel.alignUp(15 * jdm.primitive1(), jdm.memoryAlign());
-    return size;
+    return writeBuffers.size() + refs.length * 8 + 100;
   }
 
   public void seal() {

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
index adf1a90..a3bccc6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.io.Writable;
 public class HashMapWrapper extends AbstractMapJoinTableContainer implements Serializable {
   private static final long serialVersionUID = 1L;
   protected static final Logger LOG = LoggerFactory.getLogger(HashMapWrapper.class);
-  private static final long DEFAULT_HASHMAP_ENTRY_SIZE = 1024L;
+
   // default threshold for using main memory based HashMap
   private static final int THRESHOLD = 1000000;
   private static final float LOADFACTOR = 0.75f;
@@ -140,14 +140,6 @@ public class HashMapWrapper extends AbstractMapJoinTableContainer implements Ser
     return new GetAdaptor(keyTypeFromLoader);
   }
 
-  @Override
-  public long getEstimatedMemorySize() {
-    // TODO: Key and Values are Object[] which can be eagerly deserialized or lazily deserialized. To accurately
-    // estimate the entry size, every possible Objects in Key, Value should implement MemoryEstimate interface which
-    // is very intrusive. So assuming default entry size here.
-    return size() * DEFAULT_HASHMAP_ENTRY_SIZE;
-  }
-
   private class GetAdaptor implements ReusableGetAdaptor {
 
     private Object[] currentKey;

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index 6523f00..04e89e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -118,11 +118,6 @@ public class HybridHashTableContainer
 
   private final String spillLocalDirs;
 
-  @Override
-  public long getEstimatedMemorySize() {
-    return memoryUsed;
-  }
-
   /**
    * This class encapsulates the triplet together since they are closely related to each other
    * The triplet: hashmap (either in memory or on disk), small table container, big table container

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
index 014d17a..c86e5f5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
@@ -26,7 +26,6 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.MemoryEstimate;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
@@ -34,7 +33,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper;
 import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
@@ -74,11 +72,6 @@ public class MapJoinBytesTableContainer
          implements MapJoinTableContainer, MapJoinTableContainerDirectAccess {
   private static final Logger LOG = LoggerFactory.getLogger(MapJoinTableContainer.class);
 
-  // TODO: For object inspector fields, assigning 16KB for now. To better estimate the memory size every
-  // object inspectors have to implement MemoryEstimate interface which is a lot of change with little benefit compared
-  // to writing an instrumentation agent for object size estimation
-  public static final long DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE = 16 * 1024L;
-
   private final BytesBytesMultiHashMap hashMap;
   /** The OI used to deserialize values. We never deserialize keys. */
   private LazyBinaryStructObjectInspector internalValueOi;
@@ -154,7 +147,7 @@ public class MapJoinBytesTableContainer
     this.notNullMarkers = notNullMarkers;
   }
 
-  public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource, MemoryEstimate {
+  public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource {
     void setKeyValue(Writable key, Writable val) throws SerDeException;
     /** Get hash value from the key. */
     int getHashFromKey() throws SerDeException;
@@ -223,22 +216,6 @@ public class MapJoinBytesTableContainer
     public int getHashFromKey() throws SerDeException {
       throw new UnsupportedOperationException("Not supported for MapJoinBytesTableContainer");
     }
-
-    @Override
-    public long getEstimatedMemorySize() {
-      JavaDataModel jdm = JavaDataModel.get();
-      long size = 0;
-      size += keySerDe == null ? 0 : jdm.object();
-      size += valSerDe == null ? 0 : jdm.object();
-      size += keySoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
-      size += valSoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
-      size += keyOis == null ? 0 : jdm.arrayList() + keyOis.size() * DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
-      size += valOis == null ? 0 : jdm.arrayList() + valOis.size() * DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
-      size += keyObjs == null ? 0 : jdm.array() + keyObjs.length * jdm.object();
-      size += valObjs == null ? 0 : jdm.array() + valObjs.length * jdm.object();
-      size += jdm.primitive1();
-      return size;
-    }
   }
 
   static class LazyBinaryKvWriter implements KeyValueHelper {
@@ -342,15 +319,6 @@ public class MapJoinBytesTableContainer
       aliasFilter &= filterGetter.getShort();
       return aliasFilter;
     }
-
-    @Override
-    public long getEstimatedMemorySize() {
-      JavaDataModel jdm = JavaDataModel.get();
-      long size = 0;
-      size += (4 * jdm.object());
-      size += jdm.primitive1();
-      return size;
-    }
   }
 
   /*
@@ -393,15 +361,6 @@ public class MapJoinBytesTableContainer
       int keyLength = key.getLength();
       return HashCodeUtil.murmurHash(keyBytes, 0, keyLength);
     }
-
-    @Override
-    public long getEstimatedMemorySize() {
-      JavaDataModel jdm = JavaDataModel.get();
-      long size = 0;
-      size += jdm.object() + (key == null ? 0 : key.getCapacity());
-      size += jdm.object() + (val == null ? 0 : val.getCapacity());
-      return size;
-    }
   }
 
   @Override
@@ -809,19 +768,4 @@ public class MapJoinBytesTableContainer
   public int size() {
     return hashMap.size();
   }
-
-  @Override
-  public long getEstimatedMemorySize() {
-    JavaDataModel jdm = JavaDataModel.get();
-    long size = 0;
-    size += hashMap.getEstimatedMemorySize();
-    size += directWriteHelper == null ? 0 : directWriteHelper.getEstimatedMemorySize();
-    size += writeHelper == null ? 0 : writeHelper.getEstimatedMemorySize();
-    size += sortableSortOrders == null ? 0 : jdm.lengthForBooleanArrayOfSize(sortableSortOrders.length);
-    size += nullMarkers == null ? 0 : jdm.lengthForByteArrayOfSize(nullMarkers.length);
-    size += notNullMarkers == null ? 0 : jdm.lengthForByteArrayOfSize(notNullMarkers.length);
-    size += jdm.arrayList(); // empty list
-    size += DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
-    return size;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
index 5ca5ff6..6d71fef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
@@ -23,7 +23,6 @@ import java.util.List;
 
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
-import org.apache.hadoop.hive.common.MemoryEstimate;
 import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper;
 import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
@@ -32,7 +31,7 @@ import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Writable;
 
-public interface MapJoinTableContainer extends MemoryEstimate {
+public interface MapJoinTableContainer {
   /**
    * Retrieve rows from hashtable key by key, one key at a time, w/o copying the structures
    * for each key. "Old" HashMapWrapper will still create/retrieve new objects for java HashMap;

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 4ca8f93..4c69899 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -356,9 +356,12 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
     private void logConfigurations(JobConf localJobConf) {
       if (LOG.isInfoEnabled()) {
         LOG.info("Logging job configuration: ");
-        StringBuilder outWriter = new StringBuilder();
-        // redact sensitive information before logging
-        HiveConfUtil.dumpConfig(localJobConf, outWriter);
+        StringWriter outWriter = new StringWriter();
+        try {
+          Configuration.dumpConfiguration(localJobConf, outWriter);
+        } catch (IOException e) {
+          LOG.warn("Error logging job configuration", e);
+        }
         LOG.info(outWriter.toString());
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index 5f85f9e..12a76a7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -298,13 +298,12 @@ public class SparkPlanGenerator {
       throw new IllegalArgumentException(msg, e);
     }
     if (work instanceof MapWork) {
-      MapWork mapWork = (MapWork) work;
       cloned.setBoolean("mapred.task.is.map", true);
-      List<Path> inputPaths = Utilities.getInputPaths(cloned, mapWork,
+      List<Path> inputPaths = Utilities.getInputPaths(cloned, (MapWork) work,
           scratchDir, context, false);
       Utilities.setInputPaths(cloned, inputPaths);
-      Utilities.setMapWork(cloned, mapWork, scratchDir, false);
-      Utilities.createTmpDirs(cloned, mapWork);
+      Utilities.setMapWork(cloned, (MapWork) work, scratchDir, false);
+      Utilities.createTmpDirs(cloned, (MapWork) work);
       if (work instanceof MergeFileWork) {
         MergeFileWork mergeFileWork = (MergeFileWork) work;
         cloned.set(Utilities.MAPRED_MAPPER_CLASS, MergeFileMapper.class.getName());
@@ -314,21 +313,9 @@ public class SparkPlanGenerator {
       } else {
         cloned.set(Utilities.MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
       }
-      if (mapWork.getMaxSplitSize() != null) {
-        HiveConf.setLongVar(cloned, HiveConf.ConfVars.MAPREDMAXSPLITSIZE,
-            mapWork.getMaxSplitSize());
-      }
-      if (mapWork.getMinSplitSize() != null) {
+      if (((MapWork) work).getMinSplitSize() != null) {
         HiveConf.setLongVar(cloned, HiveConf.ConfVars.MAPREDMINSPLITSIZE,
-            mapWork.getMinSplitSize());
-      }
-      if (mapWork.getMinSplitSizePerNode() != null) {
-        HiveConf.setLongVar(cloned, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE,
-            mapWork.getMinSplitSizePerNode());
-      }
-      if (mapWork.getMinSplitSizePerRack() != null) {
-        HiveConf.setLongVar(cloned, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK,
-            mapWork.getMinSplitSizePerRack());
+            ((MapWork) work).getMinSplitSize());
       }
       // remember the JobConf cloned for each MapWork, so we won't clone for it again
       workToJobConf.put(work, cloned);

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
index 7eaad18..27bed9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
@@ -95,7 +95,6 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
   // number of columns pertaining to keys in a vectorized row batch
   private int keysColumnOffset;
   private static final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE;
-  private static final int BATCH_BYTES = VectorizedRowBatch.DEFAULT_BYTES;
   private StructObjectInspector keyStructInspector;
   private StructObjectInspector[] valueStructInspectors;
   /* this is only used in the error code path */
@@ -374,7 +373,6 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
     }
 
     int rowIdx = 0;
-    int batchBytes = 0;
     try {
       while (values.hasNext()) {
         /* deserialize value into columns */
@@ -383,13 +381,11 @@ public class SparkReduceRecordHandler extends SparkRecordHandler {
 
         VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag], rowIdx,
             keysColumnOffset, batch, buffer);
-        batchBytes += valueWritable.getLength();
         rowIdx++;
-        if (rowIdx >= BATCH_SIZE || batchBytes > BATCH_BYTES) {
+        if (rowIdx >= BATCH_SIZE) {
           VectorizedBatchUtil.setBatchSize(batch, rowIdx);
           reducer.process(batch, tag);
           rowIdx = 0;
-          batchBytes = 0;
           if (isLogInfoEnabled) {
             logMemoryInfo();
           }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 98b1605..4c01329 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -83,8 +83,6 @@ public class SparkTask extends Task<SparkWork> {
   private transient int totalTaskCount;
   private transient int failedTaskCount;
   private transient List<Integer> stageIds;
-  private transient SparkJobRef jobRef = null;
-  private transient boolean isShutdown = false;
 
   @Override
   public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext,
@@ -109,7 +107,7 @@ public class SparkTask extends Task<SparkWork> {
 
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
       submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB);
-      jobRef = sparkSession.submit(driverContext, sparkWork);
+      SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
 
       addToHistory(jobRef);
@@ -129,14 +127,8 @@ public class SparkTask extends Task<SparkWork> {
         // TODO: If the timeout is because of lack of resources in the cluster, we should
         // ideally also cancel the app request here. But w/o facilities from Spark or YARN,
         // it's difficult to do it on hive side alone. See HIVE-12650.
-        LOG.info("Failed to submit Spark job " + sparkJobID);
-        jobRef.cancelJob();
-      } else if (rc == 4) {
-        LOG.info("The number of tasks reaches above the limit " + conf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS) +
-            ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID );
         jobRef.cancelJob();
       }
-
       if (this.jobID == null) {
         this.jobID = sparkJobStatus.getAppID();
       }
@@ -298,23 +290,6 @@ public class SparkTask extends Task<SparkWork> {
     return finishTime;
   }
 
-  public boolean isTaskShutdown() {
-    return isShutdown;
-  }
-
-  @Override
-  public void shutdown() {
-    super.shutdown();
-    if (jobRef != null && !isShutdown) {
-      try {
-        jobRef.cancelJob();
-      } catch (Exception e) {
-        LOG.warn("failed to kill job", e);
-      }
-    }
-    isShutdown = true;
-  }
-
   /**
    * Set the number of reducers for the spark work.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index eb9883a..7d18c0a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -78,7 +78,7 @@ public class SparkUtilities {
     Path localFile = new Path(source.getPath());
     Path remoteFile = new Path(SessionState.get().getSparkSession().getHDFSSessionDir(),
         getFileName(source));
-    FileSystem fileSystem = FileSystem.get(remoteFile.toUri(), conf);
+    FileSystem fileSystem = FileSystem.get(conf);
     // Overwrite if the remote file already exists. Whether the file can be added
     // on executor is up to spark, i.e. spark.files.overwrite
     fileSystem.copyFromLocalFile(false, true, localFile, remoteFile);
@@ -92,7 +92,7 @@ public class SparkUtilities {
     String deployMode = sparkConf.contains("spark.submit.deployMode") ?
         sparkConf.get("spark.submit.deployMode") : null;
     return SparkClientUtilities.isYarnClusterMode(master, deployMode) &&
-        !(source.getScheme().equals("hdfs") || source.getScheme().equals("viewfs"));
+        !source.getScheme().equals("hdfs");
   }
 
   private static String getFileName(URI uri) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index 9dfb65e..dd73f3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -34,8 +34,7 @@ import org.apache.spark.JobExecutionStatus;
  * It print current job status to console and sleep current thread between monitor interval.
  */
 public class RemoteSparkJobMonitor extends SparkJobMonitor {
-  private int sparkJobMaxTaskCount = -1;
-  private int totalTaskCount = 0;
+
   private RemoteSparkJobStatus sparkJobStatus;
   private final HiveConf hiveConf;
 
@@ -43,7 +42,6 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
     super(hiveConf);
     this.sparkJobStatus = sparkJobStatus;
     this.hiveConf = hiveConf;
-    sparkJobMaxTaskCount = hiveConf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS);
   }
 
   @Override
@@ -102,17 +100,6 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
               } else {
                 console.logInfo(format);
               }
-            } else {
-              // Count the number of tasks, and kill application if it goes beyond the limit.
-              if (sparkJobMaxTaskCount != -1 && totalTaskCount == 0) {
-                totalTaskCount = getTotalTaskCount(progressMap);
-                if (totalTaskCount > sparkJobMaxTaskCount) {
-                  rc = 4;
-                  done = true;
-                  console.printInfo("\nThe total number of task in the Spark job [" + totalTaskCount + "] is greater than the limit [" +
-                      sparkJobMaxTaskCount + "]. The Spark job will be cancelled.");
-                }
-              }
             }
 
             printStatus(progressMap, lastProgressMap);

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index 41730b5..0b224f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -66,6 +66,7 @@ abstract class SparkJobMonitor {
   private int lines = 0;
   private final PrintStream out;
 
+
   private static final int COLUMN_1_WIDTH = 16;
   private static final String HEADER_FORMAT = "%16s%10s %13s  %5s  %9s  %7s  %7s  %6s  ";
   private static final String STAGE_FORMAT = "%-16s%10s %13s  %5s  %9s  %7s  %7s  %6s  ";
@@ -172,15 +173,6 @@ abstract class SparkJobMonitor {
     lastPrintTime = System.currentTimeMillis();
   }
 
-  protected int getTotalTaskCount(Map<String, SparkStageProgress> progressMap) {
-    int totalTasks = 0;
-    for (SparkStageProgress progress: progressMap.values() ) {
-      totalTasks += progress.getTotalTaskCount();
-    }
-
-    return totalTasks;
-  }
-
   private String getReport(Map<String, SparkStageProgress> progressMap) {
     StringBuilder reportBuffer = new StringBuilder();
     SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index 67db303..951dbb4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -67,9 +67,6 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
       return getAppID.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
     } catch (Exception e) {
       LOG.warn("Failed to get APP ID.", e);
-      if (Thread.interrupted()) {
-        error = e;
-      }
       return null;
     }
   }
@@ -189,9 +186,6 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
   }
 
   public JobHandle.State getRemoteJobState() {
-    if (error != null) {
-      return JobHandle.State.FAILED;
-    }
     return jobHandle.getState();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 6497495..aa2dfc7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -336,11 +336,6 @@ public class DagUtils {
       setupAutoReducerParallelism(edgeProp, w);
       break;
     }
-    case CUSTOM_SIMPLE_EDGE: {
-      setupQuickStart(edgeProp, w);
-      break;
-    }
-
     default:
       // nothing
     }
@@ -970,9 +965,10 @@ public class DagUtils {
    * @return true if the file names match else returns false.
    * @throws IOException when any file system related call fails
    */
-  private boolean checkPreExisting(FileSystem sourceFS, Path src, Path dest, Configuration conf)
+  private boolean checkPreExisting(Path src, Path dest, Configuration conf)
     throws IOException {
     FileSystem destFS = dest.getFileSystem(conf);
+    FileSystem sourceFS = src.getFileSystem(conf);
     FileStatus destStatus = FileUtils.getFileStatusOrNull(destFS, dest);
     if (destStatus != null) {
       return (sourceFS.getFileStatus(src).getLen() == destStatus.getLen());
@@ -992,9 +988,7 @@ public class DagUtils {
   public LocalResource localizeResource(
       Path src, Path dest, LocalResourceType type, Configuration conf) throws IOException {
     FileSystem destFS = dest.getFileSystem(conf);
-    // We call copyFromLocal below, so we basically assume src is a local file.
-    FileSystem srcFs = FileSystem.getLocal(conf);
-    if (src != null && !checkPreExisting(srcFs, src, dest, conf)) {
+    if (src != null && !checkPreExisting(src, dest, conf)) {
       // copy the src to the destination and create local resource.
       // do not overwrite.
       String srcStr = src.toString();
@@ -1006,7 +1000,7 @@ public class DagUtils {
       // authoritative one), don't wait infinitely for the notifier, just wait a little bit
       // and check HDFS before and after.
       if (notifierOld != null
-          && checkOrWaitForTheFile(srcFs, src, dest, conf, notifierOld, 1, 150, false)) {
+          && checkOrWaitForTheFile(src, dest, conf, notifierOld, 1, 150, false)) {
         return createLocalResource(destFS, dest, type, LocalResourceVisibility.PRIVATE);
       }
       try {
@@ -1028,7 +1022,7 @@ public class DagUtils {
             conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS);
         // Only log on the first wait, and check after wait on the last iteration.
         if (!checkOrWaitForTheFile(
-            srcFs, src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) {
+            src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) {
           LOG.error("Could not find the jar that was being uploaded");
           throw new IOException("Previous writer likely failed to write " + dest +
               ". Failing because I am unlikely to write too.");
@@ -1043,10 +1037,10 @@ public class DagUtils {
         LocalResourceVisibility.PRIVATE);
   }
 
-  public boolean checkOrWaitForTheFile(FileSystem srcFs, Path src, Path dest, Configuration conf,
-      Object notifier, int waitAttempts, long sleepInterval, boolean doLog) throws IOException {
+  public boolean checkOrWaitForTheFile(Path src, Path dest, Configuration conf, Object notifier,
+      int waitAttempts, long sleepInterval, boolean doLog) throws IOException {
     for (int i = 0; i < waitAttempts; i++) {
-      if (checkPreExisting(srcFs, src, dest, conf)) return true;
+      if (checkPreExisting(src, dest, conf)) return true;
       if (doLog && i == 0) {
         LOG.info("Waiting for the file " + dest + " (" + waitAttempts + " attempts, with "
             + sleepInterval + "ms interval)");
@@ -1065,7 +1059,7 @@ public class DagUtils {
         throw new IOException(interruptedException);
       }
     }
-    return checkPreExisting(srcFs, src, dest, conf); // One last check.
+    return checkPreExisting(src, dest, conf); // One last check.
   }
 
   /**
@@ -1271,20 +1265,6 @@ public class DagUtils {
     }
   }
 
-  private void setupQuickStart(TezEdgeProperty edgeProp, Vertex v)
-    throws IOException {
-    if (!edgeProp.isSlowStart()) {
-      Configuration pluginConf = new Configuration(false);
-      VertexManagerPluginDescriptor desc =
-              VertexManagerPluginDescriptor.create(ShuffleVertexManager.class.getName());
-      pluginConf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 0);
-      pluginConf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 0);
-      UserPayload payload = TezUtils.createUserPayloadFromConf(pluginConf);
-      desc.setUserPayload(payload);
-      v.setVertexManagerPlugin(desc);
-    }
-  }
-
   public String createDagName(Configuration conf, QueryPlan plan) {
     String name = getUserSpecifiedDagName(conf);
     if (name == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
index 7011d23..7b13e90 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -147,20 +146,7 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
       }
       nwayConf.setNumberOfPartitions(numPartitions);
     }
-    final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR);
-    final long memoryCheckInterval = HiveConf.getLongVar(hconf,
-      HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
-    final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
-    long numEntries = 0;
-    long noCondTaskSize = desc.getNoConditionalTaskSize();
-    boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0;
-    if (!doMemCheck) {
-      LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " +
-        "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval);
-    } else {
-      LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ",
-        noCondTaskSize, inflationFactor);
-    }
+
     for (int pos = 0; pos < mapJoinTables.length; pos++) {
       if (pos == desc.getPosBigTable()) {
         continue;
@@ -219,32 +205,12 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
           tableContainer = new HashMapWrapper(hconf, keyCount);
         }
 
-        LOG.info("Using tableContainer: " + tableContainer.getClass().getSimpleName());
+        LOG.info("Using tableContainer " + tableContainer.getClass().getSimpleName());
 
         tableContainer.setSerde(keyCtx, valCtx);
         while (kvReader.next()) {
-          tableContainer.putRow((Writable) kvReader.getCurrentKey(), (Writable) kvReader.getCurrentValue());
-          numEntries++;
-          if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) {
-            final long estMemUsage = tableContainer.getEstimatedMemorySize();
-            final long threshold = (long) (inflationFactor * noCondTaskSize);
-            // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory
-            // available for container/executor
-            final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable());
-            if (estMemUsage > effectiveThreshold) {
-              String msg = "Hash table loading exceeded memory limits." +
-                " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize +
-                " inflationFactor: " + inflationFactor + " threshold: " + threshold +
-                " effectiveThreshold: " + effectiveThreshold;
-              LOG.error(msg);
-              throw new MapJoinMemoryExhaustionError(msg);
-            } else {
-              if (LOG.isInfoEnabled()) {
-                LOG.info("Checking hash table loader memory usage.. numEntries: {} estimatedMemoryUsage: {} " +
-                  "effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold);
-              }
-            }
-          }
+          tableContainer.putRow(
+              (Writable)kvReader.getCurrentKey(), (Writable)kvReader.getCurrentValue());
         }
         tableContainer.seal();
         LOG.info("Finished loading hashtable using " + tableContainer.getClass() + ". Small table position: " + pos);

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index 60660ac..ad8b9e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -103,8 +103,6 @@ public class ReduceRecordSource implements RecordSource {
   // number of columns pertaining to keys in a vectorized row batch
   private int firstValueColumnOffset;
 
-  private final int BATCH_BYTES = VectorizedRowBatch.DEFAULT_BYTES;
-
   private StructObjectInspector keyStructInspector;
   private StructObjectInspector valueStructInspectors;
 
@@ -192,9 +190,7 @@ public class ReduceRecordSource implements RecordSource {
                                   VectorizedBatchUtil.typeInfosFromStructObjectInspector(
                                       keyStructInspector),
                                   /* useExternalBuffer */ true,
-                                  binarySortableSerDe.getSortOrders(),
-                                  binarySortableSerDe.getNullMarkers(),
-                                  binarySortableSerDe.getNotNullMarkers()));
+                                  binarySortableSerDe.getSortOrders()));
         keyBinarySortableDeserializeToRow.init(0);
 
         final int valuesSize = valueStructInspectors.getAllStructFieldRefs().size();
@@ -439,7 +435,6 @@ public class ReduceRecordSource implements RecordSource {
     final int maxSize = batch.getMaxSize();
     Preconditions.checkState(maxSize > 0);
     int rowIdx = 0;
-    int batchBytes = keyBytes.length;
     try {
       for (Object value : values) {
         if (valueLazyBinaryDeserializeToRow != null) {
@@ -447,7 +442,6 @@ public class ReduceRecordSource implements RecordSource {
           BytesWritable valueWritable = (BytesWritable) value;
           byte[] valueBytes = valueWritable.getBytes();
           int valueLength = valueWritable.getLength();
-          batchBytes += valueLength;
 
           // l4j.info("ReduceRecordSource processVectorGroup valueBytes " + valueLength + " " +
           //     VectorizedBatchUtil.displayBytes(valueBytes, 0, valueLength));
@@ -456,7 +450,7 @@ public class ReduceRecordSource implements RecordSource {
           valueLazyBinaryDeserializeToRow.deserialize(batch, rowIdx);
         }
         rowIdx++;
-        if (rowIdx >= maxSize || batchBytes >= BATCH_BYTES) {
+        if (rowIdx >= maxSize) {
 
           // Batch is full.
           batch.size = rowIdx;
@@ -468,7 +462,6 @@ public class ReduceRecordSource implements RecordSource {
             batch.cols[i].reset();
           }
           rowIdx = 0;
-          batchBytes = 0;
         }
       }
       if (rowIdx > 0) {

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index 4242262..486d43a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -23,8 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
-import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,8 +41,6 @@ import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
-import com.google.common.base.Throwables;
-
 /**
  * Hive processor for Tez that forms the vertices in Tez and processes the data.
  * Does what ExecMapper and ExecReducer does for hive in MR framework.
@@ -193,11 +189,8 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
     } catch (Throwable t) {
       originalThrowable = t;
     } finally {
-      if (originalThrowable != null && (originalThrowable instanceof Error ||
-        Throwables.getRootCause(originalThrowable) instanceof Error)) {
-        LOG.error("Cannot recover from this FATAL error", StringUtils.stringifyException(originalThrowable));
-        getContext().reportFailure(TaskFailureType.FATAL, originalThrowable,
-                      "Cannot recover from this error");
+      if (originalThrowable != null && originalThrowable instanceof Error) {
+        LOG.error(StringUtils.stringifyException(originalThrowable));
         throw new RuntimeException(originalThrowable);
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index b4d8ffa..8f45947 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -76,7 +76,6 @@ public class TezSessionPoolManager {
   private static final Logger LOG = LoggerFactory.getLogger(TezSessionPoolManager.class);
   private static final Random rdm = new Random();
 
-  private volatile SessionState initSessionState;
   private BlockingQueue<TezSessionPoolSession> defaultQueuePool;
 
   /** Priority queue sorted by expiration time of live sessions that could be expired. */
@@ -137,8 +136,6 @@ public class TezSessionPoolManager {
 
   public void startPool() throws Exception {
     if (initialSessions.isEmpty()) return;
-    // Hive SessionState available at this point.
-    initSessionState = SessionState.get();
     int threadCount = Math.min(initialSessions.size(),
         HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS));
     Preconditions.checkArgument(threadCount > 0);
@@ -262,27 +259,13 @@ public class TezSessionPoolManager {
       expirationThread = new Thread(new Runnable() {
         @Override
         public void run() {
-          try {
-            SessionState.setCurrentSessionState(initSessionState);
-            runExpirationThread();
-          } catch (Exception e) {
-            LOG.warn("Exception in TezSessionPool-expiration thread. Thread will shut down", e);
-          } finally {
-            LOG.info("TezSessionPool-expiration thread exiting");
-          }
+          runExpirationThread();
         }
       }, "TezSessionPool-expiration");
       restartThread = new Thread(new Runnable() {
         @Override
         public void run() {
-          try {
-            SessionState.setCurrentSessionState(initSessionState);
-            runRestartThread();
-          } catch (Exception e) {
-            LOG.warn("Exception in TezSessionPool-cleanup thread. Thread will shut down", e);
-          } finally {
-            LOG.info("TezSessionPool-cleanup thread exiting");
-          }
+          runRestartThread();
         }
       }, "TezSessionPool-cleanup");
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 036e918..ed1ba9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -345,7 +345,6 @@ public class TezSessionState {
       String user, final Configuration conf) throws IOException {
     // TODO: parts of this should be moved out of TezSession to reuse the clients, but there's
     //       no good place for that right now (HIVE-13698).
-    // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool.
     SessionState session = SessionState.get();
     boolean isInHs2 = session != null && session.isHiveServerQuery();
     Token<LlapTokenIdentifier> token = null;
@@ -439,7 +438,6 @@ public class TezSessionState {
   private void setupSessionAcls(Configuration tezConf, HiveConf hiveConf) throws
       IOException {
 
-    // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool.
     String user = SessionState.getUserFromAuthenticator();
     UserGroupInformation loginUserUgi = UserGroupInformation.getLoginUser();
     String loginUser =
@@ -453,7 +451,6 @@ public class TezSessionState {
             TezConfiguration.TEZ_AM_MODIFY_ACLS, addHs2User, user, loginUser);
 
     if (LOG.isDebugEnabled()) {
-      // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool.
       LOG.debug(
           "Setting Tez Session access for sessionId={} with viewAclString={}, modifyStr={}",
           SessionState.get().getSessionId(), viewStr, modifyStr);
@@ -595,7 +592,6 @@ public class TezSessionState {
    */
   private Path createTezDir(String sessionId) throws IOException {
     // tez needs its own scratch dir (per session)
-    // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool.
     Path tezDir = new Path(SessionState.get().getHdfsScratchDirURIString(), TEZ_DIR);
     tezDir = new Path(tezDir, sessionId);
     FileSystem fs = tezDir.getFileSystem(conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 1c84c6a..6c8bf29 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -18,10 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.io.Serializable;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -328,14 +324,6 @@ public class TezTask extends Task<TezWork> {
     }
   }
 
-  void checkOutputSpec(BaseWork work, JobConf jc) throws IOException {
-    for (Operator<?> op : work.getAllOperators()) {
-      if (op instanceof FileSinkOperator) {
-        ((FileSinkOperator) op).checkOutputSpecs(null, jc);
-      }
-    }
-  }
-
   DAG build(JobConf conf, TezWork work, Path scratchDir,
       LocalResource appJarLr, List<LocalResource> additionalLr, Context ctx)
       throws Exception {
@@ -369,6 +357,7 @@ public class TezTask extends Task<TezWork> {
     setAccessControlsForCurrentUser(dag, queryPlan.getQueryId(), conf);
 
     for (BaseWork w: ws) {
+
       boolean isFinal = work.getLeaves().contains(w);
 
       // translate work to vertex
@@ -390,8 +379,6 @@ public class TezTask extends Task<TezWork> {
             children.add(v);
           }
         }
-        JobConf parentConf = workToConf.get(unionWorkItems.get(0));
-        checkOutputSpec(w, parentConf);
 
         // create VertexGroup
         Vertex[] vertexArray = new Vertex[unionWorkItems.size()];
@@ -404,7 +391,7 @@ public class TezTask extends Task<TezWork> {
 
         // For a vertex group, all Outputs use the same Key-class, Val-class and partitioner.
         // Pick any one source vertex to figure out the Edge configuration.
-       
+        JobConf parentConf = workToConf.get(unionWorkItems.get(0));
 
         // now hook up the children
         for (BaseWork v: children) {
@@ -417,7 +404,6 @@ public class TezTask extends Task<TezWork> {
       } else {
         // Regular vertices
         JobConf wxConf = utils.initializeVertexConf(conf, ctx, w);
-        checkOutputSpec(w, wxConf);
         Vertex wx =
             utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal,
                 work, work.getVertexType(w));

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
index cd3404a..eccbbb6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
@@ -1,20 +1,3 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.hadoop.hive.ql.exec.tez.monitoring;
 
 import org.apache.hadoop.hive.common.log.InPlaceUpdate;

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
index 7cb74a5..1400be4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
@@ -1,20 +1,3 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.hadoop.hive.ql.exec.tez.monitoring;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -28,7 +11,6 @@ import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
@@ -77,58 +59,24 @@ class DAGSummary implements PrintSummary {
     this.hiveCounters = hiveCounters(dagClient);
   }
 
-  private long hiveInputRecordsFromTezCounters(String vertexName, String inputVertexName) {
-    // Get the counters for the input vertex.
-    Set<StatusGetOpts> statusOptions = new HashSet<>(1);
-    statusOptions.add(StatusGetOpts.GET_COUNTERS);
-    VertexStatus inputVertexStatus = vertexStatus(statusOptions, inputVertexName);
-    final TezCounters inputVertexCounters = inputVertexStatus.getVertexCounters();
-
-    // eg, group name TaskCounter_Map_7_OUTPUT_Reducer_8, counter name OUTPUT_RECORDS
-    String groupName = formattedName("TaskCounter", inputVertexName, vertexName);
-    String counterName = "OUTPUT_RECORDS";
-
-    // Do not create counter if it does not exist -
-    // instead fall back to default behavior for determining input records.
-    TezCounter tezCounter = inputVertexCounters.getGroup(groupName).findCounter(counterName, false);
-    if (tezCounter == null) {
-      return -1;
-    } else {
-      return tezCounter.getValue();
-    }
-  }
-
-  private long hiveInputRecordsFromHiveCounters(String inputVertexName) {
-    // The record count from these counters may not be correct if the input vertex has
-    // edges to more than one vertex, since this value counts the records going to all
-    // destination vertices.
-
-    String intermediateRecordsCounterName = formattedName(
-        ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(),
-        inputVertexName
-    );
-    String recordsOutCounterName = formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(),
-        inputVertexName);
-    return hiveCounterValue(intermediateRecordsCounterName) + hiveCounterValue(recordsOutCounterName);
-  }
-
   private long hiveInputRecordsFromOtherVertices(String vertexName) {
     List<Vertex> inputVerticesList = dag.getVertex(vertexName).getInputVertices();
     long result = 0;
     for (Vertex inputVertex : inputVerticesList) {
-      long inputVertexRecords = hiveInputRecordsFromTezCounters(vertexName, inputVertex.getName());
-      if (inputVertexRecords < 0) {
-        inputVertexRecords = hiveInputRecordsFromHiveCounters(inputVertex.getName());
-      }
-      result += inputVertexRecords;
+      String intermediateRecordsCounterName = formattedName(
+          ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(),
+          inputVertex.getName()
+      );
+      String recordsOutCounterName = formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(),
+          inputVertex.getName());
+      result += (
+          hiveCounterValue(intermediateRecordsCounterName)
+              + hiveCounterValue(recordsOutCounterName)
+      );
     }
     return result;
   }
 
-  private String formattedName(String counterName, String srcVertexName, String destVertexName) {
-    return String.format("%s_", counterName) + srcVertexName.replace(" ", "_") + "_OUTPUT_" + destVertexName.replace(" ", "_");
-  }
-
   private String formattedName(String counterName, String vertexName) {
     return String.format("%s_", counterName) + vertexName.replace(" ", "_");
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
index fd85504..0a28edd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
@@ -1,20 +1,3 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.hadoop.hive.ql.exec.tez.monitoring;
 
 import org.apache.hadoop.fs.FileSystem;

http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
index 10e9f57..81f1755 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
@@ -1,20 +1,3 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.hadoop.hive.ql.exec.tez.monitoring;
 
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;


Mime
View raw message