hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [46/57] [abbrv] [partial] hive git commit: HIVE-11394: Enhance EXPLAIN display for vectorization (Matt McCline, reviewed by Gopal Vijayaraghavan)
Date Fri, 14 Oct 2016 00:20:07 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/f923db0b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 3a179a3..6167f48 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.optimizer.physical;
 import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
 
 import java.io.Serializable;
+import java.lang.annotation.Annotation;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -33,6 +34,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.Stack;
 import java.util.regex.Pattern;
+import org.apache.commons.lang.ArrayUtils;
 
 import org.apache.calcite.util.Pair;
 import org.apache.commons.lang.ArrayUtils;
@@ -43,6 +45,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.*;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
@@ -62,7 +66,11 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOpe
 import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkLongOperator;
 import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkMultiKeyOperator;
 import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkStringOperator;
+import org.apache.hadoop.hive.ql.exec.vector.udf.VectorUDFAdaptor;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping;
+import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSourceMapping;
+import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
@@ -73,6 +81,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
@@ -91,18 +100,36 @@ import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.LimitDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.VectorAppMasterEventDesc;
+import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorFilterDesc;
+import org.apache.hadoop.hive.ql.plan.VectorTableScanDesc;
+import org.apache.hadoop.hive.ql.plan.VectorizationCondition;
 import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc.ProcessingMode;
+import org.apache.hadoop.hive.ql.plan.VectorSparkHashTableSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorLimitDesc;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo;
 import org.apache.hadoop.hive.ql.plan.VectorPartitionConversion;
+import org.apache.hadoop.hive.ql.plan.VectorSMBJoinDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -117,10 +144,13 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.OperatorVariation;
 import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorDeserializeType;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo;
 import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo;
 import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
+import org.apache.hadoop.hive.ql.plan.VectorSelectDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.udf.UDFAcos;
 import org.apache.hadoop.hive.ql.udf.UDFAsin;
@@ -170,6 +200,9 @@ import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.NullStructSerDe;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
@@ -182,6 +215,9 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hive.common.util.AnnotationUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.ReflectionUtil;
 
 import com.google.common.base.Preconditions;
 
@@ -234,12 +270,39 @@ public class Vectorizer implements PhysicalPlanResolver {
 
   private boolean isSpark;
 
-  boolean useVectorizedInputFileFormat;
-  boolean useVectorDeserialize;
-  boolean useRowDeserialize;
+  private boolean useVectorizedInputFileFormat;
+  private boolean useVectorDeserialize;
+  private boolean useRowDeserialize;
+  private boolean isReduceVectorizationEnabled;
 
   boolean isSchemaEvolution;
 
+  private BaseWork currentBaseWork;
+  private Operator<? extends OperatorDesc> currentOperator;
+
+  public void testSetCurrentBaseWork(BaseWork testBaseWork) {
+    currentBaseWork = testBaseWork;
+  }
+
+  private void setNodeIssue(String issue) {
+    currentBaseWork.setNotVectorizedReason(
+        VectorizerReason.createNodeIssue(issue));
+  }
+
+  private void setOperatorIssue(String issue) {
+    currentBaseWork.setNotVectorizedReason(
+        VectorizerReason.createOperatorIssue(currentOperator, issue));
+  }
+
+  private void setExpressionIssue(String expressionTitle, String issue) {
+    currentBaseWork.setNotVectorizedReason(
+        VectorizerReason.createExpressionIssue(currentOperator, expressionTitle, issue));
+  }
+
+  private void clearNotVectorizedReason() {
+    currentBaseWork.setNotVectorizedReason(null);
+  }
+
   public Vectorizer() {
 
     supportedGenericUDFs.add(GenericUDFOPPlus.class);
@@ -369,6 +432,10 @@ public class Vectorizer implements PhysicalPlanResolver {
     int partitionColumnCount;
     boolean useVectorizedInputFileFormat;
 
+    boolean groupByVectorOutput;
+    boolean allNative;
+    boolean usesVectorUDFAdaptor;
+
     String[] scratchTypeNameArray;
 
     Set<Operator<? extends OperatorDesc>> nonVectorizedOps;
@@ -379,6 +446,12 @@ public class Vectorizer implements PhysicalPlanResolver {
       partitionColumnCount = 0;
     }
 
+    public void assume() {
+      groupByVectorOutput = true;
+      allNative = true;
+      usesVectorUDFAdaptor =  false;
+    }
+
     public void setAllColumnNames(List<String> allColumnNames) {
       this.allColumnNames = allColumnNames;
     }
@@ -394,9 +467,19 @@ public class Vectorizer implements PhysicalPlanResolver {
     public void setScratchTypeNameArray(String[] scratchTypeNameArray) {
       this.scratchTypeNameArray = scratchTypeNameArray;
     }
+    public void setGroupByVectorOutput(boolean groupByVectorOutput) {
+      this.groupByVectorOutput = groupByVectorOutput;
+    }
+    public void setAllNative(boolean allNative) {
+      this.allNative = allNative;
+    }
+    public void setUsesVectorUDFAdaptor(boolean usesVectorUDFAdaptor) {
+      this.usesVectorUDFAdaptor = usesVectorUDFAdaptor;
+    }
     public void setUseVectorizedInputFileFormat(boolean useVectorizedInputFileFormat) {
       this.useVectorizedInputFileFormat = useVectorizedInputFileFormat;
     }
+
     public void setNonVectorizedOps(Set<Operator<? extends OperatorDesc>> nonVectorizedOps) {
       this.nonVectorizedOps = nonVectorizedOps;
     }
@@ -428,7 +511,14 @@ public class Vectorizer implements PhysicalPlanResolver {
             scratchTypeNameArray);
       baseWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx);
 
-      baseWork.setUseVectorizedInputFileFormat(useVectorizedInputFileFormat);
+      if (baseWork instanceof MapWork) {
+        MapWork mapWork = (MapWork) baseWork;
+        mapWork.setUseVectorizedInputFileFormat(useVectorizedInputFileFormat);
+      }
+
+      baseWork.setAllNative(allNative);
+      baseWork.setGroupByVectorOutput(groupByVectorOutput);
+      baseWork.setUsesVectorUDFAdaptor(usesVectorUDFAdaptor);
     }
   }
 
@@ -445,17 +535,29 @@ public class Vectorizer implements PhysicalPlanResolver {
         throws SemanticException {
       Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
       if (currTask instanceof MapRedTask) {
-        convertMapWork(((MapRedTask) currTask).getWork().getMapWork(), false);
+        MapredWork mapredWork = ((MapRedTask) currTask).getWork();
+        convertMapWork(mapredWork.getMapWork(), false);
+        ReduceWork reduceWork = mapredWork.getReduceWork();
+        if (reduceWork != null) {
+          // Always set the EXPLAIN conditions.
+          setReduceWorkExplainConditions(reduceWork);
+
+          // We do not vectorize MR Reduce.
+        }
       } else if (currTask instanceof TezTask) {
         TezWork work = ((TezTask) currTask).getWork();
-        for (BaseWork w: work.getAllWork()) {
-          if (w instanceof MapWork) {
-            convertMapWork((MapWork) w, true);
-          } else if (w instanceof ReduceWork) {
-            // We are only vectorizing Reduce under Tez.
-            if (HiveConf.getBoolVar(hiveConf,
-                        HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) {
-              convertReduceWork((ReduceWork) w, true);
+        for (BaseWork baseWork: work.getAllWork()) {
+          if (baseWork instanceof MapWork) {
+            convertMapWork((MapWork) baseWork, true);
+          } else if (baseWork instanceof ReduceWork) {
+            ReduceWork reduceWork = (ReduceWork) baseWork;
+
+            // Always set the EXPLAIN conditions.
+            setReduceWorkExplainConditions(reduceWork);
+
+            // We are only vectorizing Reduce under Tez/Spark.
+            if (isReduceVectorizationEnabled) {
+              convertReduceWork(reduceWork);
             }
           }
         }
@@ -463,22 +565,43 @@ public class Vectorizer implements PhysicalPlanResolver {
         SparkWork sparkWork = (SparkWork) currTask.getWork();
         for (BaseWork baseWork : sparkWork.getAllWork()) {
           if (baseWork instanceof MapWork) {
-            convertMapWork((MapWork) baseWork, false);
-          } else if (baseWork instanceof ReduceWork
-              && HiveConf.getBoolVar(hiveConf,
-                  HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) {
-            convertReduceWork((ReduceWork) baseWork, false);
+            convertMapWork((MapWork) baseWork, true);
+          } else if (baseWork instanceof ReduceWork) {
+            ReduceWork reduceWork = (ReduceWork) baseWork;
+
+            // Always set the EXPLAIN conditions.
+            setReduceWorkExplainConditions(reduceWork);
+
+            if (isReduceVectorizationEnabled) {
+              convertReduceWork(reduceWork);
+            }
           }
         }
       }
+
       return null;
     }
 
-    private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
+    private void convertMapWork(MapWork mapWork, boolean isTezOrSpark) throws SemanticException {
+
+      mapWork.setVectorizationExamined(true);
+
+      // Global used when setting errors, etc.
+      currentBaseWork = mapWork;
+
       VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
-      boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTez);
+      vectorTaskColumnInfo.assume();
+
+      boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTezOrSpark);
       if (ret) {
-        vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTez);
+        vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTezOrSpark);
+      } else if (currentBaseWork.getVectorizationEnabled()) {
+        VectorizerReason notVectorizedReason  = currentBaseWork.getNotVectorizedReason();
+        if (notVectorizedReason == null) {
+          LOG.info("Cannot vectorize: unknown");
+        } else {
+          LOG.info("Cannot vectorize: " + notVectorizedReason.toString());
+        }
       }
     }
 
@@ -499,6 +622,7 @@ public class Vectorizer implements PhysicalPlanResolver {
 
       LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork();
       if ((aliasToWork == null) || (aliasToWork.size() == 0)) {
+        setNodeIssue("Vectorized map work requires work");
         return null;
       }
       int tableScanCount = 0;
@@ -507,7 +631,7 @@ public class Vectorizer implements PhysicalPlanResolver {
       for (Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) {
         Operator<?> op = entry.getValue();
         if (op == null) {
-          LOG.warn("Map work has invalid aliases to work with. Fail validation!");
+          setNodeIssue("Vectorized map work requires a valid alias");
           return null;
         }
         if (op instanceof TableScanOperator) {
@@ -517,7 +641,7 @@ public class Vectorizer implements PhysicalPlanResolver {
         }
       }
       if (tableScanCount > 1) {
-        LOG.warn("Map work has more than 1 TableScanOperator. Fail validation!");
+        setNodeIssue("Vectorized map work only works with 1 TableScanOperator");
         return null;
       }
       return new ImmutablePair(alias, tableScanOperator);
@@ -558,22 +682,6 @@ public class Vectorizer implements PhysicalPlanResolver {
       }
     }
 
-    private String getHiveOptionsString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname);
-      sb.append("=");
-      sb.append(useVectorizedInputFileFormat);
-      sb.append(", ");
-      sb.append(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname);
-      sb.append("=");
-      sb.append(useVectorDeserialize);
-      sb.append(", and ");
-      sb.append(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname);
-      sb.append("=");
-      sb.append(useRowDeserialize);
-      return sb.toString();
-    }
-
     /*
      * There are 3 modes of reading for vectorization:
      *
@@ -588,44 +696,58 @@ public class Vectorizer implements PhysicalPlanResolver {
      *      the row object into the VectorizedRowBatch with VectorAssignRow.
      *      This picks up Input File Format not supported by the other two.
      */
-    private boolean verifyAndSetVectorPartDesc(PartitionDesc pd, boolean isAcidTable) {
+    private boolean verifyAndSetVectorPartDesc(PartitionDesc pd, boolean isAcidTable,
+        HashSet<String> inputFileFormatClassNameSet, HashSet<String> enabledConditionsMetSet,
+        ArrayList<String> enabledConditionsNotMetList) {
 
       String inputFileFormatClassName = pd.getInputFileFormatClassName();
 
+      // Always collect input file formats.
+      inputFileFormatClassNameSet.add(inputFileFormatClassName);
+
+      boolean isInputFileFormatVectorized = Utilities.isInputFileFormatVectorized(pd);
+
+      if (isAcidTable) {
+
+        // Today, ACID tables are only ORC and that format is vectorizable.  Verify these
+        // assumptions.
+        Preconditions.checkState(isInputFileFormatVectorized);
+        Preconditions.checkState(inputFileFormatClassName.equals(OrcInputFormat.class.getName()));
+
+        if (!useVectorizedInputFileFormat) {
+          enabledConditionsNotMetList.add(
+              "Vectorizing ACID tables requires " + HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname);
+          return false;
+        }
+
+        pd.setVectorPartitionDesc(
+            VectorPartitionDesc.createVectorizedInputFileFormat(
+                inputFileFormatClassName, Utilities.isInputFileFormatSelfDescribing(pd)));
+
+        enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname);
+        return true;
+      }
+
       // Look for Pass-Thru case where InputFileFormat has VectorizedInputFormatInterface
       // and reads VectorizedRowBatch as a "row".
 
-      if (isAcidTable || useVectorizedInputFileFormat) {
+      if (useVectorizedInputFileFormat) {
 
-        if (Utilities.isInputFileFormatVectorized(pd)) {
-
-          if (!useVectorizedInputFileFormat) {
-            LOG.info("ACID tables con only be vectorized for the input file format -- " +
-                "i.e. when Hive Configuration option " +
-                HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname +
-                "=true");
-            return false;
-          }
+        if (isInputFileFormatVectorized) {
 
           pd.setVectorPartitionDesc(
               VectorPartitionDesc.createVectorizedInputFileFormat(
                   inputFileFormatClassName, Utilities.isInputFileFormatSelfDescribing(pd)));
 
+          enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname);
           return true;
         }
-
-        // Today, ACID tables are only ORC and that format is vectorizable.  Verify this
-        // assumption.
-        Preconditions.checkState(!isAcidTable);
+        // Fall through and look for other options...
       }
 
-      if (!(isSchemaEvolution || isAcidTable) &&
-        (useVectorDeserialize || useRowDeserialize)) {
-        LOG.info("Input format: " + inputFileFormatClassName + " cannot be vectorized" +
-            " when both " + HiveConf.ConfVars.HIVE_SCHEMA_EVOLUTION.varname + "=false and " +
-            " ACID table is " + isAcidTable + " and " +
-            " given the Hive Configuration options " + getHiveOptionsString());
-        return false;
+      if (!isSchemaEvolution) {
+        enabledConditionsNotMetList.add(
+            "Vectorizing tables without Schema Evolution requires " + HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname);
       }
 
       String deserializerClassName = pd.getDeserializerClassName();
@@ -635,6 +757,12 @@ public class Vectorizer implements PhysicalPlanResolver {
       //
       // Do the "vectorized" row-by-row deserialization into a VectorizedRowBatch in the
       // VectorMapOperator.
+      boolean isTextFormat = inputFileFormatClassName.equals(TextInputFormat.class.getName()) &&
+          deserializerClassName.equals(LazySimpleSerDe.class.getName());
+      boolean isSequenceFormat =
+          inputFileFormatClassName.equals(SequenceFileInputFormat.class.getName()) &&
+          deserializerClassName.equals(LazyBinarySerDe.class.getName());
+      boolean isVectorDeserializeEligable = isTextFormat || isSequenceFormat;
 
       if (useVectorDeserialize) {
 
@@ -648,8 +776,7 @@ public class Vectorizer implements PhysicalPlanResolver {
         //    org.apache.hadoop.mapred.SequenceFileInputFormat
         //    org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
 
-        if (inputFileFormatClassName.equals(TextInputFormat.class.getName()) &&
-            deserializerClassName.equals(LazySimpleSerDe.class.getName())) {
+        if (isTextFormat) {
 
           Properties properties = pd.getTableDesc().getProperties();
           String lastColumnTakesRestString =
@@ -659,10 +786,11 @@ public class Vectorizer implements PhysicalPlanResolver {
               lastColumnTakesRestString.equalsIgnoreCase("true"));
           if (lastColumnTakesRest) {
 
-            // If row mode will not catch this, then inform.
+            // If row mode will not catch this input file format, then not enabled.
             if (useRowDeserialize) {
-              LOG.info("Input format: " + inputFileFormatClassName + " cannot be vectorized" +
-                  " when " + serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST + "is true");
+              enabledConditionsNotMetList.add(
+                  inputFileFormatClassName + " " +
+                  serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST + " must be disabled ");
               return false;
             }
           } else {
@@ -670,17 +798,19 @@ public class Vectorizer implements PhysicalPlanResolver {
                 VectorPartitionDesc.createVectorDeserialize(
                     inputFileFormatClassName, VectorDeserializeType.LAZY_SIMPLE));
 
+            enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname);
             return true;
           }
-        } else if (inputFileFormatClassName.equals(SequenceFileInputFormat.class.getName()) &&
-            deserializerClassName.equals(LazyBinarySerDe.class.getName())) {
+        } else if (isSequenceFormat) {
 
           pd.setVectorPartitionDesc(
               VectorPartitionDesc.createVectorDeserialize(
                   inputFileFormatClassName, VectorDeserializeType.LAZY_BINARY));
 
+          enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname);
           return true;
         }
+        // Fall through and look for other options...
       }
 
       // Otherwise, if enabled, deserialize rows using regular Serde and add the object
@@ -694,17 +824,29 @@ public class Vectorizer implements PhysicalPlanResolver {
                 Utilities.isInputFileFormatSelfDescribing(pd),
                 deserializerClassName));
 
+        enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname);
         return true;
 
       }
 
-      LOG.info("Input format: " + inputFileFormatClassName + " cannot be vectorized" +
-          " given the Hive Configuration options " + getHiveOptionsString());
-
+      if (isInputFileFormatVectorized) {
+        Preconditions.checkState(!useVectorizedInputFileFormat);
+        enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname);
+      } else {
+        // Only offer these when the input file format is not the fast vectorized formats.
+        if (isVectorDeserializeEligable) {
+          Preconditions.checkState(!useVectorDeserialize);
+          enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname);
+        } else {
+          // Since row mode takes everyone.
+          enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname);
+        }
+      }
+ 
       return false;
     }
 
-    private boolean validateInputFormatAndSchemaEvolution(MapWork mapWork, String alias,
+    private ImmutablePair<Boolean, Boolean> validateInputFormatAndSchemaEvolution(MapWork mapWork, String alias,
         TableScanOperator tableScanOperator, VectorTaskColumnInfo vectorTaskColumnInfo)
             throws SemanticException {
 
@@ -732,27 +874,39 @@ public class Vectorizer implements PhysicalPlanResolver {
 
       LinkedHashMap<Path, ArrayList<String>> pathToAliases = mapWork.getPathToAliases();
       LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo();
+
+      // Remember the input file formats we validated and why.
+      HashSet<String> inputFileFormatClassNameSet = new HashSet<String>();
+      HashSet<String> enabledConditionsMetSet = new HashSet<String>();
+      ArrayList<String> enabledConditionsNotMetList = new ArrayList<String>();
+
       for (Entry<Path, ArrayList<String>> entry: pathToAliases.entrySet()) {
         Path path = entry.getKey();
         List<String> aliases = entry.getValue();
         boolean isPresent = (aliases != null && aliases.indexOf(alias) != -1);
         if (!isPresent) {
-          LOG.info("Alias " + alias + " not present in aliases " + aliases);
-          return false;
+          setOperatorIssue("Alias " + alias + " not present in aliases " + aliases);
+          return new ImmutablePair<Boolean,Boolean>(false, false);
         }
         PartitionDesc partDesc = pathToPartitionInfo.get(path);
         if (partDesc.getVectorPartitionDesc() != null) {
           // We seen this already.
           continue;
         }
-        if (!verifyAndSetVectorPartDesc(partDesc, isAcidTable)) {
-          return false;
+        if (!verifyAndSetVectorPartDesc(partDesc, isAcidTable, inputFileFormatClassNameSet,
+            enabledConditionsMetSet, enabledConditionsNotMetList)) {
+
+          // Always set these so EXPLAIN can see.
+          mapWork.setVectorizationInputFileFormatClassNameSet(inputFileFormatClassNameSet);
+          mapWork.setVectorizationEnabledConditionsMet(new ArrayList(enabledConditionsMetSet));
+          mapWork.setVectorizationEnabledConditionsNotMet(enabledConditionsNotMetList);
+
+          // We consider this an enable issue, not a not vectorized issue.
+          LOG.info("Cannot enable vectorization because input file format(s) " + inputFileFormatClassNameSet +
+              " do not met conditions " + VectorizationCondition.addBooleans(enabledConditionsNotMetList, false));
+          return new ImmutablePair<Boolean,Boolean>(false, true);
         }
         VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
-        if (LOG.isInfoEnabled()) {
-          LOG.info("Vectorizer path: " + path + ", " + vectorPartDesc.toString() +
-              ", aliases " + aliases);
-        }
 
         if (isFirst) {
 
@@ -796,13 +950,13 @@ public class Vectorizer implements PhysicalPlanResolver {
          * implicitly defaulted to null.
          */
         if (nextDataColumnList.size() > tableDataColumnList.size()) {
-          LOG.info(
+          setOperatorIssue(
               String.format(
                   "Could not vectorize partition %s " +
                   "(deserializer " + deserializer.getClass().getName() + ")" +
                   "The partition column names %d is greater than the number of table columns %d",
                   path, nextDataColumnList.size(), tableDataColumnList.size()));
-          return false;
+          return new ImmutablePair<Boolean,Boolean>(false, false);
         }
         if (!(deserializer instanceof NullStructSerDe)) {
 
@@ -811,13 +965,13 @@ public class Vectorizer implements PhysicalPlanResolver {
             String nextColumnName = nextDataColumnList.get(i);
             String tableColumnName = tableDataColumnList.get(i);
             if (!nextColumnName.equals(tableColumnName)) {
-              LOG.info(
+              setOperatorIssue(
                   String.format(
                       "Could not vectorize partition %s " +
                       "(deserializer " + deserializer.getClass().getName() + ")" +
                       "The partition column name %s is does not match table column name %s",
                       path, nextColumnName, tableColumnName));
-              return false;
+              return new ImmutablePair<Boolean,Boolean>(false, false);
             }
           }
         }
@@ -852,29 +1006,50 @@ public class Vectorizer implements PhysicalPlanResolver {
       // Helps to keep this for debugging.
       vectorTaskColumnInfo.setTableScanOperator(tableScanOperator);
 
-      return true;
+      // Always set these so EXPLAIN can see.
+      mapWork.setVectorizationInputFileFormatClassNameSet(inputFileFormatClassNameSet);
+      mapWork.setVectorizationEnabledConditionsMet(new ArrayList(enabledConditionsMetSet));
+      mapWork.setVectorizationEnabledConditionsNotMet(enabledConditionsNotMetList);
+
+      return new ImmutablePair<Boolean,Boolean>(true, false);
     }
 
-    private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez)
+    private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTezOrSpark)
             throws SemanticException {
 
       LOG.info("Validating MapWork...");
 
-      ImmutablePair<String,TableScanOperator> pair = verifyOnlyOneTableScanOperator(mapWork);
-      if (pair ==  null) {
+      ImmutablePair<String,TableScanOperator> onlyOneTableScanPair = verifyOnlyOneTableScanOperator(mapWork);
+      if (onlyOneTableScanPair ==  null) {
+        VectorizerReason notVectorizedReason = currentBaseWork.getNotVectorizedReason();
+        Preconditions.checkState(notVectorizedReason != null);
+        mapWork.setVectorizationEnabledConditionsNotMet(Arrays.asList(new String[] {notVectorizedReason.toString()}));
         return false;
       }
-      String alias = pair.left;
-      TableScanOperator tableScanOperator = pair.right;
+      String alias = onlyOneTableScanPair.left;
+      TableScanOperator tableScanOperator = onlyOneTableScanPair.right;
 
       // This call fills in the column names, types, and partition column count in
       // vectorTaskColumnInfo.
-      if (!validateInputFormatAndSchemaEvolution(mapWork, alias, tableScanOperator, vectorTaskColumnInfo)) {
+      currentOperator = tableScanOperator;
+      ImmutablePair<Boolean, Boolean> validateInputFormatAndSchemaEvolutionPair =
+          validateInputFormatAndSchemaEvolution(mapWork, alias, tableScanOperator, vectorTaskColumnInfo);
+      if (!validateInputFormatAndSchemaEvolutionPair.left) {
+        // Have we already set the enabled conditions not met?
+        if (!validateInputFormatAndSchemaEvolutionPair.right) {
+          VectorizerReason notVectorizedReason = currentBaseWork.getNotVectorizedReason();
+          Preconditions.checkState(notVectorizedReason != null);
+          mapWork.setVectorizationEnabledConditionsNotMet(Arrays.asList(new String[] {notVectorizedReason.toString()}));
+        }
         return false;
       }
 
+      // Now we are enabled and any issues found from here on out are considered
+      // not vectorized issues.
+      mapWork.setVectorizationEnabled(true);
+
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTez);
+      MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTezOrSpark);
       addMapWorkRules(opRules, vnp);
       Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
       GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -896,13 +1071,13 @@ public class Vectorizer implements PhysicalPlanResolver {
     }
 
     private void vectorizeMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo,
-            boolean isTez) throws SemanticException {
+            boolean isTezOrSpark) throws SemanticException {
 
       LOG.info("Vectorizing MapWork...");
       mapWork.setVectorMode(true);
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
       MapWorkVectorizationNodeProcessor vnp =
-          new MapWorkVectorizationNodeProcessor(mapWork, isTez, vectorTaskColumnInfo);
+          new MapWorkVectorizationNodeProcessor(mapWork, isTezOrSpark, vectorTaskColumnInfo);
       addMapWorkRules(opRules, vnp);
       Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
       GraphWalker ogw = new PreOrderOnceWalker(disp);
@@ -923,11 +1098,34 @@ public class Vectorizer implements PhysicalPlanResolver {
       return;
     }
 
-    private void convertReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException {
+    private void setReduceWorkExplainConditions(ReduceWork reduceWork) {
+
+      reduceWork.setVectorizationExamined(true);
+
+      reduceWork.setReduceVectorizationEnabled(isReduceVectorizationEnabled);
+      reduceWork.setVectorReduceEngine(
+          HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE));
+    }
+
+    private void convertReduceWork(ReduceWork reduceWork) throws SemanticException {
+
+      // Global used when setting errors, etc.
+      currentBaseWork = reduceWork;
+      currentBaseWork.setVectorizationEnabled(true);
+
       VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
-      boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo, isTez);
+      vectorTaskColumnInfo.assume();
+
+      boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo);
       if (ret) {
-        vectorizeReduceWork(reduceWork, vectorTaskColumnInfo, isTez);
+        vectorizeReduceWork(reduceWork, vectorTaskColumnInfo);
+      } else if (currentBaseWork.getVectorizationEnabled()) {
+        VectorizerReason notVectorizedReason  = currentBaseWork.getNotVectorizedReason();
+        if (notVectorizedReason == null) {
+          LOG.info("Cannot vectorize: unknown");
+        } else {
+          LOG.info("Cannot vectorize: " + notVectorizedReason.toString());
+        }
       }
     }
 
@@ -941,13 +1139,14 @@ public class Vectorizer implements PhysicalPlanResolver {
         // Check key ObjectInspector.
         ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector();
         if (keyObjectInspector == null || !(keyObjectInspector instanceof StructObjectInspector)) {
+          setNodeIssue("Key object inspector missing or not StructObjectInspector");
           return false;
         }
         StructObjectInspector keyStructObjectInspector = (StructObjectInspector)keyObjectInspector;
         List<? extends StructField> keyFields = keyStructObjectInspector.getAllStructFieldRefs();
 
-        // Tez doesn't use tagging...
         if (reduceWork.getNeedsTagging()) {
+          setNodeIssue("Tez doesn't use tagging");
           return false;
         }
 
@@ -955,6 +1154,7 @@ public class Vectorizer implements PhysicalPlanResolver {
         ObjectInspector valueObjectInspector = reduceWork.getValueObjectInspector();
         if (valueObjectInspector == null ||
                 !(valueObjectInspector instanceof StructObjectInspector)) {
+          setNodeIssue("Value object inspector missing or not StructObjectInspector");
           return false;
         }
         StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector;
@@ -984,7 +1184,7 @@ public class Vectorizer implements PhysicalPlanResolver {
     }
 
     private boolean validateReduceWork(ReduceWork reduceWork,
-        VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException {
+        VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException {
 
       LOG.info("Validating ReduceWork...");
 
@@ -1015,7 +1215,7 @@ public class Vectorizer implements PhysicalPlanResolver {
     }
 
     private void vectorizeReduceWork(ReduceWork reduceWork,
-        VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException {
+        VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException {
 
       LOG.info("Vectorizing ReduceWork...");
       reduceWork.setVectorMode(true);
@@ -1025,7 +1225,7 @@ public class Vectorizer implements PhysicalPlanResolver {
       // VectorizationContext...  Do we use PreOrderWalker instead of DefaultGraphWalker.
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
       ReduceWorkVectorizationNodeProcessor vnp =
-              new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo, isTez);
+              new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo);
       addReduceWorkRules(opRules, vnp);
       Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
       GraphWalker ogw = new PreOrderWalker(disp);
@@ -1053,7 +1253,7 @@ public class Vectorizer implements PhysicalPlanResolver {
   class MapWorkValidationNodeProcessor implements NodeProcessor {
 
     private final MapWork mapWork;
-    private final boolean isTez;
+    private final boolean isTezOrSpark;
 
     // Children of Vectorized GROUPBY that outputs rows instead of vectorized row batchs.
     protected final Set<Operator<? extends OperatorDesc>> nonVectorizedOps =
@@ -1063,9 +1263,9 @@ public class Vectorizer implements PhysicalPlanResolver {
       return nonVectorizedOps;
     }
 
-    public MapWorkValidationNodeProcessor(MapWork mapWork, boolean isTez) {
+    public MapWorkValidationNodeProcessor(MapWork mapWork, boolean isTezOrSpark) {
       this.mapWork = mapWork;
-      this.isTez = isTez;
+      this.isTezOrSpark = isTezOrSpark;
     }
 
     @Override
@@ -1077,13 +1277,13 @@ public class Vectorizer implements PhysicalPlanResolver {
           return new Boolean(true);
         }
         boolean ret;
+        currentOperator = op;
         try {
-          ret = validateMapWorkOperator(op, mapWork, isTez);
+          ret = validateMapWorkOperator(op, mapWork, isTezOrSpark);
         } catch (Exception e) {
           throw new SemanticException(e);
         }
         if (!ret) {
-          LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized.");
           return new Boolean(false);
         }
         // When Vectorized GROUPBY outputs rows instead of vectorized row batches, we don't
@@ -1119,9 +1319,9 @@ public class Vectorizer implements PhysicalPlanResolver {
         if (nonVectorizedOps.contains(op)) {
           return new Boolean(true);
         }
+        currentOperator = op;
         boolean ret = validateReduceWorkOperator(op);
         if (!ret) {
-          LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized.");
           return new Boolean(false);
         }
         // When Vectorized GROUPBY outputs rows instead of vectorized row batches, we don't
@@ -1142,9 +1342,12 @@ public class Vectorizer implements PhysicalPlanResolver {
     // The vectorization context for the Map or Reduce task.
     protected VectorizationContext taskVectorizationContext;
 
+    protected final VectorTaskColumnInfo vectorTaskColumnInfo;
     protected final Set<Operator<? extends OperatorDesc>> nonVectorizedOps;
 
-    VectorizationNodeProcessor(Set<Operator<? extends OperatorDesc>> nonVectorizedOps) {
+    VectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo,
+        Set<Operator<? extends OperatorDesc>> nonVectorizedOps) {
+      this.vectorTaskColumnInfo = vectorTaskColumnInfo;
       this.nonVectorizedOps = nonVectorizedOps;
     }
 
@@ -1192,11 +1395,11 @@ public class Vectorizer implements PhysicalPlanResolver {
     }
 
     public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op,
-            VectorizationContext vContext, boolean isTez) throws SemanticException {
+            VectorizationContext vContext, boolean isTezOrSpark) throws SemanticException {
       Operator<? extends OperatorDesc> vectorOp = op;
       try {
         if (!opsDone.contains(op)) {
-          vectorOp = vectorizeOperator(op, vContext, isTez);
+          vectorOp = vectorizeOperator(op, vContext, isTezOrSpark, vectorTaskColumnInfo);
           opsDone.add(op);
           if (vectorOp != op) {
             opToVectorOpMap.put(op, vectorOp);
@@ -1220,14 +1423,14 @@ public class Vectorizer implements PhysicalPlanResolver {
 
     private final MapWork mWork;
     private final VectorTaskColumnInfo vectorTaskColumnInfo;
-    private final boolean isTez;
+    private final boolean isTezOrSpark;
 
-    public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez,
+    public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTezOrSpark,
         VectorTaskColumnInfo vectorTaskColumnInfo) {
-      super(vectorTaskColumnInfo.getNonVectorizedOps());
+      super(vectorTaskColumnInfo, vectorTaskColumnInfo.getNonVectorizedOps());
       this.mWork = mWork;
       this.vectorTaskColumnInfo = vectorTaskColumnInfo;
-      this.isTez = isTez;
+      this.isTezOrSpark = isTezOrSpark;
     }
 
     @Override
@@ -1241,6 +1444,7 @@ public class Vectorizer implements PhysicalPlanResolver {
 
       VectorizationContext vContext = null;
 
+      currentOperator = op;
       if (op instanceof TableScanOperator) {
         if (taskVectorizationContext == null) {
           taskVectorizationContext = getVectorizationContext(op.getName(), vectorTaskColumnInfo);
@@ -1261,7 +1465,7 @@ public class Vectorizer implements PhysicalPlanResolver {
             + " using vectorization context" + vContext.toString());
       }
 
-      Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTez);
+      Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTezOrSpark);
 
       if (LOG.isDebugEnabled()) {
         if (vectorOp instanceof VectorizationContextRegion) {
@@ -1279,7 +1483,6 @@ public class Vectorizer implements PhysicalPlanResolver {
 
     private final VectorTaskColumnInfo vectorTaskColumnInfo;
 
-    private final boolean isTez;
 
     private Operator<? extends OperatorDesc> rootVectorOp;
 
@@ -1287,13 +1490,11 @@ public class Vectorizer implements PhysicalPlanResolver {
       return rootVectorOp;
     }
 
-    public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo,
-            boolean isTez) {
+    public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo) {
 
-      super(vectorTaskColumnInfo.getNonVectorizedOps());
+      super(vectorTaskColumnInfo, vectorTaskColumnInfo.getNonVectorizedOps());
       this.vectorTaskColumnInfo =  vectorTaskColumnInfo;
       rootVectorOp = null;
-      this.isTez = isTez;
     }
 
     @Override
@@ -1309,6 +1510,7 @@ public class Vectorizer implements PhysicalPlanResolver {
 
       boolean saveRootVectorOp = false;
 
+      currentOperator = op;
       if (op.getParentOperators().size() == 0) {
         LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + vectorTaskColumnInfo.allColumnNames.toString());
 
@@ -1333,7 +1535,7 @@ public class Vectorizer implements PhysicalPlanResolver {
       assert vContext != null;
       LOG.info("ReduceWorkVectorizationNodeProcessor process operator " + op.getName() + " using vectorization context" + vContext.toString());
 
-      Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTez);
+      Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, true);
 
       if (LOG.isDebugEnabled()) {
         if (vectorOp instanceof VectorizationContextRegion) {
@@ -1390,6 +1592,10 @@ public class Vectorizer implements PhysicalPlanResolver {
         HiveConf.getBoolVar(hiveConf,
             HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE);
 
+    isReduceVectorizationEnabled =
+        HiveConf.getBoolVar(hiveConf,
+            HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED);
+
     isSchemaEvolution =
         HiveConf.getBoolVar(hiveConf,
             HiveConf.ConfVars.HIVE_SCHEMA_EVOLUTION);
@@ -1407,18 +1613,32 @@ public class Vectorizer implements PhysicalPlanResolver {
     return physicalContext;
   }
 
-  boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op, MapWork mWork, boolean isTez) {
-    boolean ret = false;
+  private void setOperatorNotSupported(Operator<? extends OperatorDesc> op) {
+    OperatorDesc desc = op.getConf();
+    Annotation note = AnnotationUtils.getAnnotation(desc.getClass(), Explain.class);
+    if (note != null) {
+      Explain explainNote = (Explain) note;
+      setNodeIssue(explainNote.displayName() + " (" + op.getType() + ") not supported");
+    } else {
+      setNodeIssue("Operator " + op.getType() + " not supported");
+    }
+  }
+
+  boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op, MapWork mWork, boolean isTezOrSpark) {
+    boolean ret;
     switch (op.getType()) {
       case MAPJOIN:
         if (op instanceof MapJoinOperator) {
           ret = validateMapJoinOperator((MapJoinOperator) op);
         } else if (op instanceof SMBMapJoinOperator) {
           ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op);
+        } else {
+          setOperatorNotSupported(op);
+          ret = false;
         }
         break;
       case GROUPBY:
-        ret = validateGroupByOperator((GroupByOperator) op, false, isTez);
+        ret = validateGroupByOperator((GroupByOperator) op, false, isTezOrSpark);
         break;
       case FILTER:
         ret = validateFilterOperator((FilterOperator) op);
@@ -1443,6 +1663,7 @@ public class Vectorizer implements PhysicalPlanResolver {
             validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op);
         break;
       default:
+        setOperatorNotSupported(op);
         ret = false;
         break;
     }
@@ -1450,7 +1671,7 @@ public class Vectorizer implements PhysicalPlanResolver {
   }
 
   boolean validateReduceWorkOperator(Operator<? extends OperatorDesc> op) {
-    boolean ret = false;
+    boolean ret;
     switch (op.getType()) {
       case MAPJOIN:
         // Does MAPJOIN actually get planned in Reduce?
@@ -1458,6 +1679,9 @@ public class Vectorizer implements PhysicalPlanResolver {
           ret = validateMapJoinOperator((MapJoinOperator) op);
         } else if (op instanceof SMBMapJoinOperator) {
           ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op);
+        } else {
+          setOperatorNotSupported(op);
+          ret = false;
         }
         break;
       case GROUPBY:
@@ -1465,6 +1689,7 @@ public class Vectorizer implements PhysicalPlanResolver {
                     HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_GROUPBY_ENABLED)) {
           ret = validateGroupByOperator((GroupByOperator) op, true, true);
         } else {
+          setNodeIssue("Operator " + op.getType() + " not enabled (" + HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_GROUPBY_ENABLED.name() + "=true IS false)");
           ret = false;
         }
         break;
@@ -1490,6 +1715,7 @@ public class Vectorizer implements PhysicalPlanResolver {
             validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op);
         break;
       default:
+        setOperatorNotSupported(op);
         ret = false;
         break;
     }
@@ -1512,7 +1738,7 @@ public class Vectorizer implements PhysicalPlanResolver {
       throws SemanticException {
     if (op.getType().equals(OperatorType.GROUPBY)) {
       GroupByDesc desc = (GroupByDesc) op.getConf();
-      return !desc.getVectorDesc().isVectorOutput();
+      return !((VectorGroupByDesc) desc.getVectorDesc()).isVectorOutput();
     }
     return false;
   }
@@ -1526,6 +1752,7 @@ public class Vectorizer implements PhysicalPlanResolver {
   private boolean validateTableScanOperator(TableScanOperator op, MapWork mWork) {
     TableScanDesc desc = op.getConf();
     if (desc.isGatherStats()) {
+      setOperatorIssue("gather stats not supported");
       return false;
     }
 
@@ -1540,25 +1767,21 @@ public class Vectorizer implements PhysicalPlanResolver {
   private boolean validateMapJoinDesc(MapJoinDesc desc) {
     byte posBigTable = (byte) desc.getPosBigTable();
     List<ExprNodeDesc> filterExprs = desc.getFilters().get(posBigTable);
-    if (!validateExprNodeDesc(filterExprs, VectorExpressionDescriptor.Mode.FILTER)) {
-      LOG.info("Cannot vectorize map work filter expression");
+    if (!validateExprNodeDesc(filterExprs, "Filter", VectorExpressionDescriptor.Mode.FILTER)) {
       return false;
     }
     List<ExprNodeDesc> keyExprs = desc.getKeys().get(posBigTable);
-    if (!validateExprNodeDesc(keyExprs)) {
-      LOG.info("Cannot vectorize map work key expression");
+    if (!validateExprNodeDesc(keyExprs, "Key")) {
       return false;
     }
     List<ExprNodeDesc> valueExprs = desc.getExprs().get(posBigTable);
-    if (!validateExprNodeDesc(valueExprs)) {
-      LOG.info("Cannot vectorize map work value expression");
+    if (!validateExprNodeDesc(valueExprs, "Value")) {
       return false;
     }
     Byte[] order = desc.getTagOrder();
     Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]);
     List<ExprNodeDesc> smallTableExprs = desc.getExprs().get(posSingleVectorMapJoinSmallTable);
-    if (!validateExprNodeDesc(smallTableExprs)) {
-      LOG.info("Cannot vectorize map work small table expression");
+    if (!validateExprNodeDesc(smallTableExprs, "Small Table")) {
       return false;
     }
     return true;
@@ -1571,24 +1794,23 @@ public class Vectorizer implements PhysicalPlanResolver {
     List<ExprNodeDesc> filterExprs = desc.getFilters().get(tag);
     List<ExprNodeDesc> keyExprs = desc.getKeys().get(tag);
     List<ExprNodeDesc> valueExprs = desc.getExprs().get(tag);
-    return validateExprNodeDesc(filterExprs, VectorExpressionDescriptor.Mode.FILTER) &&
-        validateExprNodeDesc(keyExprs) && validateExprNodeDesc(valueExprs);
+    return validateExprNodeDesc(filterExprs, "Filter", VectorExpressionDescriptor.Mode.FILTER) &&
+        validateExprNodeDesc(keyExprs, "Key") && validateExprNodeDesc(valueExprs, "Value");
   }
 
   private boolean validateReduceSinkOperator(ReduceSinkOperator op) {
     List<ExprNodeDesc> keyDescs = op.getConf().getKeyCols();
     List<ExprNodeDesc> partitionDescs = op.getConf().getPartitionCols();
     List<ExprNodeDesc> valueDesc = op.getConf().getValueCols();
-    return validateExprNodeDesc(keyDescs) && validateExprNodeDesc(partitionDescs) &&
-        validateExprNodeDesc(valueDesc);
+    return validateExprNodeDesc(keyDescs, "Key") && validateExprNodeDesc(partitionDescs, "Partition") &&
+        validateExprNodeDesc(valueDesc, "Value");
   }
 
   private boolean validateSelectOperator(SelectOperator op) {
     List<ExprNodeDesc> descList = op.getConf().getColList();
     for (ExprNodeDesc desc : descList) {
-      boolean ret = validateExprNodeDesc(desc);
+      boolean ret = validateExprNodeDesc(desc, "Select");
       if (!ret) {
-        LOG.info("Cannot vectorize select expression: " + desc.toString());
         return false;
       }
     }
@@ -1597,28 +1819,26 @@ public class Vectorizer implements PhysicalPlanResolver {
 
   private boolean validateFilterOperator(FilterOperator op) {
     ExprNodeDesc desc = op.getConf().getPredicate();
-    return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.FILTER);
+    return validateExprNodeDesc(desc, "Predicate", VectorExpressionDescriptor.Mode.FILTER);
   }
 
-  private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTez) {
+  private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTezOrSpark) {
     GroupByDesc desc = op.getConf();
-    VectorGroupByDesc vectorDesc = desc.getVectorDesc();
 
     if (desc.isGroupingSetsPresent()) {
-      LOG.info("Grouping sets not supported in vector mode");
+      setOperatorIssue("Grouping sets not supported");
       return false;
     }
     if (desc.pruneGroupingSetId()) {
-      LOG.info("Pruning grouping set id not supported in vector mode");
+      setOperatorIssue("Pruning grouping set id not supported");
       return false;
     }
     if (desc.getMode() != GroupByDesc.Mode.HASH && desc.isDistinct()) {
-      LOG.info("DISTINCT not supported in vector mode");
+      setOperatorIssue("DISTINCT not supported");
       return false;
     }
-    boolean ret = validateExprNodeDesc(desc.getKeys());
+    boolean ret = validateExprNodeDesc(desc.getKeys(), "Key");
     if (!ret) {
-      LOG.info("Cannot vectorize groupby key expression " + desc.getKeys().toString());
       return false;
     }
 
@@ -1731,6 +1951,9 @@ public class Vectorizer implements PhysicalPlanResolver {
 
     // If all the aggregation outputs are primitive, we can output VectorizedRowBatch.
     // Otherwise, we the rest of the operator tree will be row mode.
+    VectorGroupByDesc vectorDesc = new VectorGroupByDesc();
+    desc.setVectorDesc(vectorDesc);
+
     vectorDesc.setVectorOutput(retPair.right);
 
     vectorDesc.setProcessingMode(processingMode);
@@ -1745,14 +1968,15 @@ public class Vectorizer implements PhysicalPlanResolver {
    return true;
   }
 
-  private boolean validateExprNodeDesc(List<ExprNodeDesc> descs) {
-    return validateExprNodeDesc(descs, VectorExpressionDescriptor.Mode.PROJECTION);
+  private boolean validateExprNodeDesc(List<ExprNodeDesc> descs, String expressionTitle) {
+    return validateExprNodeDesc(descs, expressionTitle, VectorExpressionDescriptor.Mode.PROJECTION);
   }
 
   private boolean validateExprNodeDesc(List<ExprNodeDesc> descs,
+          String expressionTitle,
           VectorExpressionDescriptor.Mode mode) {
     for (ExprNodeDesc d : descs) {
-      boolean ret = validateExprNodeDesc(d, mode);
+      boolean ret = validateExprNodeDesc(d, expressionTitle, mode);
       if (!ret) {
         return false;
       }
@@ -1775,19 +1999,20 @@ public class Vectorizer implements PhysicalPlanResolver {
     return new Pair<Boolean, Boolean>(true, outputIsPrimitive);
   }
 
-  private boolean validateExprNodeDescRecursive(ExprNodeDesc desc, VectorExpressionDescriptor.Mode mode) {
+  private boolean validateExprNodeDescRecursive(ExprNodeDesc desc, String expressionTitle,
+      VectorExpressionDescriptor.Mode mode) {
     if (desc instanceof ExprNodeColumnDesc) {
       ExprNodeColumnDesc c = (ExprNodeColumnDesc) desc;
       // Currently, we do not support vectorized virtual columns (see HIVE-5570).
       if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(c.getColumn())) {
-        LOG.info("Cannot vectorize virtual column " + c.getColumn());
+        setExpressionIssue(expressionTitle, "Virtual columns not supported (" + c.getColumn() + ")");
         return false;
       }
     }
     String typeName = desc.getTypeInfo().getTypeName();
     boolean ret = validateDataType(typeName, mode);
     if (!ret) {
-      LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName);
+      setExpressionIssue(expressionTitle, "Data type " + typeName + " of " + desc.toString() + " not supported");
       return false;
     }
     boolean isInExpression = false;
@@ -1795,7 +2020,7 @@ public class Vectorizer implements PhysicalPlanResolver {
       ExprNodeGenericFuncDesc d = (ExprNodeGenericFuncDesc) desc;
       boolean r = validateGenericUdf(d);
       if (!r) {
-        LOG.info("Cannot vectorize UDF " + d);
+        setExpressionIssue(expressionTitle, "UDF " + d + " not supported");
         return false;
       }
       GenericUDF genericUDF = d.getGenericUDF();
@@ -1806,14 +2031,14 @@ public class Vectorizer implements PhysicalPlanResolver {
           && desc.getChildren().get(0).getTypeInfo().getCategory() == Category.STRUCT) {
         // Don't restrict child expressions for projection.
         // Always use loose FILTER mode.
-        if (!validateStructInExpression(desc, VectorExpressionDescriptor.Mode.FILTER)) {
+        if (!validateStructInExpression(desc, expressionTitle, VectorExpressionDescriptor.Mode.FILTER)) {
           return false;
         }
       } else {
         for (ExprNodeDesc d : desc.getChildren()) {
           // Don't restrict child expressions for projection.
           // Always use loose FILTER mode.
-          if (!validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER)) {
+          if (!validateExprNodeDescRecursive(d, expressionTitle, VectorExpressionDescriptor.Mode.FILTER)) {
             return false;
           }
         }
@@ -1823,7 +2048,7 @@ public class Vectorizer implements PhysicalPlanResolver {
   }
 
   private boolean validateStructInExpression(ExprNodeDesc desc,
-      VectorExpressionDescriptor.Mode mode) {
+      String expressionTitle, VectorExpressionDescriptor.Mode mode) {
     for (ExprNodeDesc d : desc.getChildren()) {
       TypeInfo typeInfo = d.getTypeInfo();
       if (typeInfo.getCategory() != Category.STRUCT) {
@@ -1839,7 +2064,8 @@ public class Vectorizer implements PhysicalPlanResolver {
         TypeInfo fieldTypeInfo = fieldTypeInfos.get(f);
         Category category = fieldTypeInfo.getCategory();
         if (category != Category.PRIMITIVE) {
-          LOG.info("Cannot vectorize struct field " + fieldNames.get(f)
+          setExpressionIssue(expressionTitle,
+              "Cannot vectorize struct field " + fieldNames.get(f)
               + " of type " + fieldTypeInfo.getTypeName());
           return false;
         }
@@ -1852,7 +2078,8 @@ public class Vectorizer implements PhysicalPlanResolver {
         if (inConstantType != InConstantType.INT_FAMILY
             && inConstantType != InConstantType.FLOAT_FAMILY
             && inConstantType != InConstantType.STRING_FAMILY) {
-          LOG.info("Cannot vectorize struct field " + fieldNames.get(f)
+          setExpressionIssue(expressionTitle,
+              "Cannot vectorize struct field " + fieldNames.get(f)
               + " of type " + fieldTypeInfo.getTypeName());
           return false;
         }
@@ -1861,31 +2088,28 @@ public class Vectorizer implements PhysicalPlanResolver {
     return true;
   }
 
-  private boolean validateExprNodeDesc(ExprNodeDesc desc) {
-    return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.PROJECTION);
+  private boolean validateExprNodeDesc(ExprNodeDesc desc, String expressionTitle) {
+    return validateExprNodeDesc(desc, expressionTitle, VectorExpressionDescriptor.Mode.PROJECTION);
   }
 
-  boolean validateExprNodeDesc(ExprNodeDesc desc, VectorExpressionDescriptor.Mode mode) {
-    if (!validateExprNodeDescRecursive(desc, mode)) {
+  boolean validateExprNodeDesc(ExprNodeDesc desc, String expressionTitle,
+      VectorExpressionDescriptor.Mode mode) {
+    if (!validateExprNodeDescRecursive(desc, expressionTitle, mode)) {
       return false;
     }
     try {
       VectorizationContext vc = new ValidatorVectorizationContext(hiveConf);
       if (vc.getVectorExpression(desc, mode) == null) {
         // TODO: this cannot happen - VectorizationContext throws in such cases.
-        LOG.info("getVectorExpression returned null");
+        setExpressionIssue(expressionTitle, "getVectorExpression returned null");
         return false;
       }
     } catch (Exception e) {
       if (e instanceof HiveException) {
-        LOG.info(e.getMessage());
+        setExpressionIssue(expressionTitle, e.getMessage());
       } else {
-        if (LOG.isDebugEnabled()) {
-          // Show stack trace.
-          LOG.debug("Failed to vectorize", e);
-        } else {
-          LOG.info("Failed to vectorize", e.getMessage());
-        }
+        String issue = "exception: " + VectorizationContext.getStackTraceAsSingleLine(e);
+        setExpressionIssue(expressionTitle, issue);
       }
       return false;
     }
@@ -1905,9 +2129,9 @@ public class Vectorizer implements PhysicalPlanResolver {
     }
   }
 
-  private boolean validateAggregationIsPrimitive(VectorAggregateExpression vectorAggrExpr) {
+  public static ObjectInspector.Category aggregationOutputCategory(VectorAggregateExpression vectorAggrExpr) {
     ObjectInspector outputObjInspector = vectorAggrExpr.getOutputObjectInspector();
-    return (outputObjInspector.getCategory() == ObjectInspector.Category.PRIMITIVE);
+    return outputObjInspector.getCategory();
   }
 
   private Pair<Boolean,Boolean> validateAggregationDesc(AggregationDesc aggDesc, ProcessingMode processingMode,
@@ -1915,11 +2139,10 @@ public class Vectorizer implements PhysicalPlanResolver {
 
     String udfName = aggDesc.getGenericUDAFName().toLowerCase();
     if (!supportedAggregationUdfs.contains(udfName)) {
-      LOG.info("Cannot vectorize groupby aggregate expression: UDF " + udfName + " not supported");
+      setExpressionIssue("Aggregation Function", "UDF " + udfName + " not supported");
       return new Pair<Boolean,Boolean>(false, false);
     }
-    if (aggDesc.getParameters() != null && !validateExprNodeDesc(aggDesc.getParameters())) {
-      LOG.info("Cannot vectorize groupby aggregate expression: UDF parameters not supported");
+    if (aggDesc.getParameters() != null && !validateExprNodeDesc(aggDesc.getParameters(), "Aggregation Function UDF " + udfName + " parameter")) {
       return new Pair<Boolean,Boolean>(false, false);
     }
 
@@ -1933,6 +2156,7 @@ public class Vectorizer implements PhysicalPlanResolver {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Vectorization of aggreation should have succeeded ", e);
       }
+      setExpressionIssue("Aggregation Function", "Vectorization of aggreation should have succeeded " + e);
       return new Pair<Boolean,Boolean>(false, false);
     }
     if (LOG.isDebugEnabled()) {
@@ -1940,11 +2164,12 @@ public class Vectorizer implements PhysicalPlanResolver {
           " vector expression " + vectorAggrExpr.toString());
     }
 
-    boolean outputIsPrimitive = validateAggregationIsPrimitive(vectorAggrExpr);
+    ObjectInspector.Category outputCategory = aggregationOutputCategory(vectorAggrExpr);
+    boolean outputIsPrimitive = (outputCategory == ObjectInspector.Category.PRIMITIVE);
     if (processingMode == ProcessingMode.MERGE_PARTIAL &&
         hasKeys &&
         !outputIsPrimitive) {
-      LOG.info("Vectorized Reduce MergePartial GROUP BY keys can only handle aggregate outputs that are primitive types");
+      setOperatorIssue("Vectorized Reduce MergePartial GROUP BY keys can only handle aggregate outputs that are primitive types");
       return new Pair<Boolean,Boolean>(false, false);
     }
 
@@ -2012,12 +2237,12 @@ public class Vectorizer implements PhysicalPlanResolver {
         if (smallTableIndices[i] < 0) {
           // Negative numbers indicate a column to be (deserialize) read from the small table's
           // LazyBinary value row.
-          LOG.info("Vectorizer isBigTableOnlyResults smallTableIndices[i] < 0 returning false");
+          setOperatorIssue("Vectorizer isBigTableOnlyResults smallTableIndices[i] < 0 returning false");
           return false;
         }
       }
     } else if (smallTableRetainSize > 0) {
-      LOG.info("Vectorizer isBigTableOnlyResults smallTableRetainSize > 0 returning false");
+      setOperatorIssue("Vectorizer isBigTableOnlyResults smallTableRetainSize > 0 returning false");
       return false;
     }
 
@@ -2026,20 +2251,21 @@ public class Vectorizer implements PhysicalPlanResolver {
   }
 
   Operator<? extends OperatorDesc> specializeMapJoinOperator(Operator<? extends OperatorDesc> op,
-        VectorizationContext vContext, MapJoinDesc desc) throws HiveException {
+        VectorizationContext vContext, MapJoinDesc desc, VectorMapJoinInfo vectorMapJoinInfo)
+            throws HiveException {
     Operator<? extends OperatorDesc> vectorOp = null;
     Class<? extends Operator<?>> opClass = null;
 
-    VectorMapJoinDesc.HashTableImplementationType hashTableImplementationType = HashTableImplementationType.NONE;
-    VectorMapJoinDesc.HashTableKind hashTableKind = HashTableKind.NONE;
-    VectorMapJoinDesc.HashTableKeyType hashTableKeyType = HashTableKeyType.NONE;
+    VectorMapJoinDesc vectorDesc = (VectorMapJoinDesc) desc.getVectorDesc();
+
+    HashTableImplementationType hashTableImplementationType = HashTableImplementationType.NONE;
+    HashTableKind hashTableKind = HashTableKind.NONE;
+    HashTableKeyType hashTableKeyType = HashTableKeyType.NONE;
+    OperatorVariation operatorVariation = OperatorVariation.NONE;
 
-    if (HiveConf.getBoolVar(hiveConf,
-              HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED)) {
+    if (vectorDesc.getIsFastHashTableEnabled()) {
       hashTableImplementationType = HashTableImplementationType.FAST;
     } else {
-      // Restrict to using BytesBytesMultiHashMap via MapJoinBytesTableContainer or
-      // HybridHashTableContainer.
       hashTableImplementationType = HashTableImplementationType.OPTIMIZED;
     }
 
@@ -2061,20 +2287,31 @@ public class Vectorizer implements PhysicalPlanResolver {
       Map<Byte, List<ExprNodeDesc>> keyExprs = desc.getKeys();
       List<ExprNodeDesc> bigTableKeyExprs = keyExprs.get(posBigTable);
       if (bigTableKeyExprs.size() == 1) {
-        String typeName = bigTableKeyExprs.get(0).getTypeString();
-        LOG.info("Vectorizer vectorizeOperator map join typeName " + typeName);
-        if (typeName.equals("boolean")) {
+        TypeInfo typeInfo = bigTableKeyExprs.get(0).getTypeInfo();
+        LOG.info("Vectorizer vectorizeOperator map join typeName " + typeInfo.getTypeName());
+        switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
+        case BOOLEAN:
           hashTableKeyType = HashTableKeyType.BOOLEAN;
-        } else if (typeName.equals("tinyint")) {
+          break;
+        case BYTE:
           hashTableKeyType = HashTableKeyType.BYTE;
-        } else if (typeName.equals("smallint")) {
+          break;
+        case SHORT:
           hashTableKeyType = HashTableKeyType.SHORT;
-        } else if (typeName.equals("int")) {
+          break;
+        case INT:
           hashTableKeyType = HashTableKeyType.INT;
-        } else if (typeName.equals("bigint") || typeName.equals("long")) {
+          break;
+        case LONG:
           hashTableKeyType = HashTableKeyType.LONG;
-        } else if (VectorizationContext.isStringFamily(typeName)) {
+          break;
+        case STRING:
+        case CHAR:
+        case VARCHAR:
+        case BINARY:
           hashTableKeyType = HashTableKeyType.STRING;
+        default:
+          // Stay with multi-key.
         }
       }
     }
@@ -2082,16 +2319,20 @@ public class Vectorizer implements PhysicalPlanResolver {
     switch (joinType) {
     case JoinDesc.INNER_JOIN:
       if (!isInnerBigOnly) {
+        operatorVariation = OperatorVariation.INNER;
         hashTableKind = HashTableKind.HASH_MAP;
       } else {
+        operatorVariation = OperatorVariation.INNER_BIG_ONLY;
         hashTableKind = HashTableKind.HASH_MULTISET;
       }
       break;
     case JoinDesc.LEFT_OUTER_JOIN:
     case JoinDesc.RIGHT_OUTER_JOIN:
+      operatorVariation = OperatorVariation.OUTER;
       hashTableKind = HashTableKind.HASH_MAP;
       break;
     case JoinDesc.LEFT_SEMI_JOIN:
+      operatorVariation = OperatorVariation.LEFT_SEMI;
       hashTableKind = HashTableKind.HASH_SET;
       break;
     default:
@@ -2106,86 +2347,84 @@ public class Vectorizer implements PhysicalPlanResolver {
     case SHORT:
     case INT:
     case LONG:
-      switch (joinType) {
-      case JoinDesc.INNER_JOIN:
-        if (!isInnerBigOnly) {
-          opClass = VectorMapJoinInnerLongOperator.class;
-        } else {
-          opClass = VectorMapJoinInnerBigOnlyLongOperator.class;
-        }
+      switch (operatorVariation) {
+      case INNER:
+        opClass = VectorMapJoinInnerLongOperator.class;
         break;
-      case JoinDesc.LEFT_OUTER_JOIN:
-      case JoinDesc.RIGHT_OUTER_JOIN:
-        opClass = VectorMapJoinOuterLongOperator.class;
+      case INNER_BIG_ONLY:
+        opClass = VectorMapJoinInnerBigOnlyLongOperator.class;
         break;
-      case JoinDesc.LEFT_SEMI_JOIN:
+      case LEFT_SEMI:
         opClass = VectorMapJoinLeftSemiLongOperator.class;
         break;
+      case OUTER:
+        opClass = VectorMapJoinOuterLongOperator.class;
+        break;
       default:
-        throw new HiveException("Unknown join type " + joinType);
+        throw new HiveException("Unknown operator variation " + operatorVariation);
       }
       break;
     case STRING:
-      switch (joinType) {
-      case JoinDesc.INNER_JOIN:
-        if (!isInnerBigOnly) {
-          opClass = VectorMapJoinInnerStringOperator.class;
-        } else {
-          opClass = VectorMapJoinInnerBigOnlyStringOperator.class;
-        }
+      switch (operatorVariation) {
+      case INNER:
+        opClass = VectorMapJoinInnerStringOperator.class;
         break;
-      case JoinDesc.LEFT_OUTER_JOIN:
-      case JoinDesc.RIGHT_OUTER_JOIN:
-        opClass = VectorMapJoinOuterStringOperator.class;
+      case INNER_BIG_ONLY:
+        opClass = VectorMapJoinInnerBigOnlyStringOperator.class;
         break;
-      case JoinDesc.LEFT_SEMI_JOIN:
+      case LEFT_SEMI:
         opClass = VectorMapJoinLeftSemiStringOperator.class;
         break;
+      case OUTER:
+        opClass = VectorMapJoinOuterStringOperator.class;
+        break;
       default:
-        throw new HiveException("Unknown join type " + joinType);
+        throw new HiveException("Unknown operator variation " + operatorVariation);
       }
       break;
     case MULTI_KEY:
-      switch (joinType) {
-      case JoinDesc.INNER_JOIN:
-        if (!isInnerBigOnly) {
-          opClass = VectorMapJoinInnerMultiKeyOperator.class;
-        } else {
-          opClass = VectorMapJoinInnerBigOnlyMultiKeyOperator.class;
-        }
+      switch (operatorVariation) {
+      case INNER:
+        opClass = VectorMapJoinInnerMultiKeyOperator.class;
         break;
-      case JoinDesc.LEFT_OUTER_JOIN:
-      case JoinDesc.RIGHT_OUTER_JOIN:
-        opClass = VectorMapJoinOuterMultiKeyOperator.class;
+      case INNER_BIG_ONLY:
+        opClass = VectorMapJoinInnerBigOnlyMultiKeyOperator.class;
         break;
-      case JoinDesc.LEFT_SEMI_JOIN:
+      case LEFT_SEMI:
         opClass = VectorMapJoinLeftSemiMultiKeyOperator.class;
         break;
+      case OUTER:
+        opClass = VectorMapJoinOuterMultiKeyOperator.class;
+        break;
       default:
-        throw new HiveException("Unknown join type " + joinType);
+        throw new HiveException("Unknown operator variation " + operatorVariation);
       }
       break;
+    default:
+      throw new RuntimeException("Unexpected hash table key type " + hashTableKeyType.name());
     }
 
-    vectorOp = OperatorFactory.getVectorOperator(
-        opClass, op.getCompilationOpContext(), op.getConf(), vContext);
-    LOG.info("Vectorizer vectorizeOperator map join class " + vectorOp.getClass().getSimpleName());
-
     boolean minMaxEnabled = HiveConf.getBoolVar(hiveConf,
         HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_MINMAX_ENABLED);
 
-    VectorMapJoinDesc vectorDesc = desc.getVectorDesc();
     vectorDesc.setHashTableImplementationType(hashTableImplementationType);
     vectorDesc.setHashTableKind(hashTableKind);
     vectorDesc.setHashTableKeyType(hashTableKeyType);
+    vectorDesc.setOperatorVariation(operatorVariation);
     vectorDesc.setMinMaxEnabled(minMaxEnabled);
+    vectorDesc.setVectorMapJoinInfo(vectorMapJoinInfo);
+
+    vectorOp = OperatorFactory.getVectorOperator(
+        opClass, op.getCompilationOpContext(), op.getConf(), vContext);
+    LOG.info("Vectorizer vectorizeOperator map join class " + vectorOp.getClass().getSimpleName());
+
     return vectorOp;
   }
 
-  private boolean onExpressionHasNullSafes(MapJoinDesc desc) {
+  public static boolean onExpressionHasNullSafes(MapJoinDesc desc) {
     boolean[] nullSafes = desc.getNullSafes();
     if (nullSafes == null) {
-	return false;
+      return false;
     }
     for (boolean nullSafe : nullSafes) {
       if (nullSafe) {
@@ -2196,53 +2435,372 @@ public class Vectorizer implements PhysicalPlanResolver {
   }
 
   private boolean canSpecializeMapJoin(Operator<? extends OperatorDesc> op, MapJoinDesc desc,
-      boolean isTez) {
+      boolean isTezOrSpark, VectorizationContext vContext, VectorMapJoinInfo vectorMapJoinInfo)
+          throws HiveException {
+
+    Preconditions.checkState(op instanceof MapJoinOperator);
+
+    // Allocate a VectorReduceSinkDesc initially with implementation type NONE so EXPLAIN
+    // can report this operator was vectorized, but not native.  And, the conditions.
+    VectorMapJoinDesc vectorDesc = new VectorMapJoinDesc();
+    desc.setVectorDesc(vectorDesc);
+
+    boolean isVectorizationMapJoinNativeEnabled = HiveConf.getBoolVar(hiveConf,
+        HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_ENABLED);
+
+    String engine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
+
+    boolean oneMapJoinCondition = (desc.getConds().length == 1);
+
+    boolean hasNullSafes = onExpressionHasNullSafes(desc);
+
+    byte posBigTable = (byte) desc.getPosBigTable();
+
+    // Since we want to display all the met and not met conditions in EXPLAIN, we determine all
+    // information first....
+
+    List<ExprNodeDesc> keyDesc = desc.getKeys().get(posBigTable);
+    VectorExpression[] allBigTableKeyExpressions = vContext.getVectorExpressions(keyDesc);
+    final int allBigTableKeyExpressionsLength = allBigTableKeyExpressions.length;
+    boolean isEmptyKey = (allBigTableKeyExpressionsLength == 0);
 
-    boolean specialize = false;
+    boolean supportsKeyTypes = true;  // Assume.
+    HashSet<String> notSupportedKeyTypes = new HashSet<String>();
 
-    if (op instanceof MapJoinOperator &&
+    // Since a key expression can be a calculation and the key will go into a scratch column,
+    // we need the mapping and type information.
+    int[] bigTableKeyColumnMap = new int[allBigTableKeyExpressionsLength];
+    String[] bigTableKeyColumnNames = new String[allBigTableKeyExpressionsLength];
+    TypeInfo[] bigTableKeyTypeInfos = new TypeInfo[allBigTableKeyExpressionsLength];
+    ArrayList<VectorExpression> bigTableKeyExpressionsList = new ArrayList<VectorExpression>();
+    VectorExpression[] bigTableKeyExpressions;
+    for (int i = 0; i < allBigTableKeyExpressionsLength; i++) {
+      VectorExpression ve = allBigTableKeyExpressions[i];
+      if (!IdentityExpression.isColumnOnly(ve)) {
+        bigTableKeyExpressionsList.add(ve);
+      }
+      bigTableKeyColumnMap[i] = ve.getOutputColumn();
+
+      ExprNodeDesc exprNode = keyDesc.get(i);
+      bigTableKeyColumnNames[i] = exprNode.toString();
+
+      TypeInfo typeInfo = exprNode.getTypeInfo();
+      // Verify we handle the key column types for an optimized table.  This is the effectively the
+      // same check used in HashTableLoader.
+      if (!MapJoinKey.isSupportedField(typeInfo)) {
+        supportsKeyTypes = false;
+        Category category = typeInfo.getCategory();
+        notSupportedKeyTypes.add(
+            (category != Category.PRIMITIVE ? category.toString() :
+              ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory().toString()));
+      }
+      bigTableKeyTypeInfos[i] = typeInfo;
+    }
+    if (bigTableKeyExpressionsList.size() == 0) {
+      bigTableKeyExpressions = null;
+    } else {
+      bigTableKeyExpressions = bigTableKeyExpressionsList.toArray(new VectorExpression[0]);
+    }
+
+    List<ExprNodeDesc> bigTableExprs = desc.getExprs().get(posBigTable);
+    VectorExpression[] allBigTableValueExpressions = vContext.getVectorExpressions(bigTableExprs);
+
+    boolean isFastHashTableEnabled =
         HiveConf.getBoolVar(hiveConf,
-            HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_ENABLED)) {
+            HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED);
+    vectorDesc.setIsFastHashTableEnabled(isFastHashTableEnabled);
 
-      // Currently, only under Tez and non-N-way joins.
-      if (isTez && desc.getConds().length == 1 && !onExpressionHasNullSafes(desc)) {
+    // Especially since LLAP is prone to turn it off in the MapJoinDesc in later
+    // physical optimizer stages...
+    boolean isHybridHashJoin = desc.isHybridHashJoin();
+    vectorDesc.setIsHybridHashJoin(isHybridHashJoin);
 
-        // Ok, all basic restrictions satisfied so far...
-        specialize = true;
+    /*
+     * Populate vectorMapJoininfo.
+     */
 
-        if (!HiveConf.getBoolVar(hiveConf,
-            HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED)) {
+    /*
+     * Similarly, we need a mapping since a value expression can be a calculation and the value
+     * will go into a scratch column.
+     */
+    int[] bigTableValueColumnMap = new int[allBigTableValueExpressions.length];
+    String[] bigTableValueColumnNames = new String[allBigTableValueExpressions.length];
+    TypeInfo[] bigTableValueTypeInfos = new TypeInfo[allBigTableValueExpressions.length];
+    ArrayList<VectorExpression> bigTableValueExpressionsList = new ArrayList<VectorExpression>();
+    VectorExpression[] bigTableValueExpressions;
+    for (int i = 0; i < bigTableValueColumnMap.length; i++) {
+      VectorExpression ve = allBigTableValueExpressions[i];
+      if (!IdentityExpression.isColumnOnly(ve)) {
+        bigTableValueExpressionsList.add(ve);
+      }
+      bigTableValueColumnMap[i] = ve.getOutputColumn();
 
-          // We are using the optimized hash table we have further
-          // restrictions (using optimized and key type).
+      ExprNodeDesc exprNode = bigTableExprs.get(i);
+      bigTableValueColumnNames[i] = exprNode.toString();
+      bigTableValueTypeInfos[i] = exprNode.getTypeInfo();
+    }
+    if (bigTableValueExpressionsList.size() == 0) {
+      bigTableValueExpressions = null;
+    } else {
+      bigTableValueExpressions = bigTableValueExpressionsList.toArray(new VectorExpression[0]);
+    }
 
-          if (!HiveConf.getBoolVar(hiveConf,
-              HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE)) {
-            specialize = false;
-          } else {
-            byte posBigTable = (byte) desc.getPosBigTable();
-            Map<Byte, List<ExprNodeDesc>> keyExprs = desc.getKeys();
-            List<ExprNodeDesc> bigTableKeyExprs = keyExprs.get(posBigTable);
-            for (ExprNodeDesc exprNodeDesc : bigTableKeyExprs) {
-              String typeName = exprNodeDesc.getTypeString();
-              if (!MapJoinKey.isSupportedField(typeName)) {
-                specialize = false;
-                break;
-              }
+    vectorMapJoinInfo.setBigTableKeyColumnMap(bigTableKeyColumnMap);
+    vectorMapJoinInfo.setBigTableKeyColumnNames(bigTableKeyColumnNames);
+    vectorMapJoinInfo.setBigTableKeyTypeInfos(bigTableKeyTypeInfos);
+    vectorMapJoinInfo.setBigTableKeyExpressions(bigTableKeyExpressions);
+
+    vectorMapJoinInfo.setBigTableValueColumnMap(bigTableValueColumnMap);
+    vectorMapJoinInfo.setBigTableValueColumnNames(bigTableValueColumnNames);
+    vectorMapJoinInfo.setBigTableValueTypeInfos(bigTableValueTypeInfos);
+    vectorMapJoinInfo.setBigTableValueExpressions(bigTableValueExpressions);
+
+    /*
+     * Small table information.
+     */
+    VectorColumnOutputMapping bigTableRetainedMapping =
+        new VectorColumnOutputMapping("Big Table Retained Mapping");
+
+    VectorColumnOutputMapping bigTableOuterKeyMapping =
+        new VectorColumnOutputMapping("Big Table Outer Key Mapping");
+
+    // The order of the fields in the LazyBinary small table value must be used, so
+    // we use the source ordering flavor for the mapping.
+    VectorColumnSourceMapping smallTableMapping =
+        new VectorColumnSourceMapping("Small Table Mapping");
+
+    Byte[] order = desc.getTagOrder();
+    Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]);
+    boolean isOuterJoin = !desc.getNoOuterJoin();
+
+    /*
+     * Gather up big and small table output result information from the MapJoinDesc.
+     */
+    List<Integer> bigTableRetainList = desc.getRetainList().get(posBigTable);
+    int bigTableRetainSize = bigTableRetainList.size();
+
+    int[] smallTableIndices;
+    int smallTableIndicesSize;
+    List<ExprNodeDesc> smallTableExprs = desc.getExprs().get(posSingleVectorMapJoinSmallTable);
+    if (desc.getValueIndices() != null && desc.getValueIndices().get(posSingleVectorMapJoinSmallTable) != null) {
+      smallTableIndices = desc.getValueIndices().get(posSingleVectorMapJoinSmallTable);
+      smallTableIndicesSize = smallTableIndices.length;
+    } else {
+      smallTableIndices = null;
+      smallTableIndicesSize = 0;
+    }
+
+    List<Integer> smallTableRetainList = desc.getRetainList().get(posSingleVectorMapJoinSmallTable);
+    int smallTableRetainSize = smallTableRetainList.size();
+
+    int smallTableResultSize = 0;
+    if (smallTableIndicesSize > 0) {
+      smallTableResultSize = smallTableIndicesSize;
+    } else if (smallTableRetainSize > 0) {
+      smallTableResultSize = smallTableRetainSize;
+    }
+
+    /*
+     * Determine the big table retained mapping first so we can optimize out (with
+     * projection) copying inner join big table keys in the subsequent small table results section.
+     */
+
+    // We use a mapping object here so we can build the projection in any order and
+    // get the ordered by 0 to n-1 output columns at the end.
+    //
+    // Also, to avoid copying a big table key into the small table result area for inner joins,
+    // we reference it with the projection so there can be duplicate output columns
+    // in the projection.
+    VectorColumnSourceMapping projectionMapping = new VectorColumnSourceMapping("Projection Mapping");
+
+    int nextOutputColumn = (order[0] == posBigTable ? 0 : smallTableResultSize);
+    for (int i = 0; i < bigTableRetainSize; i++) {
+
+      // Since bigTableValueExpressions may do a calculation and produce a scratch column, we
+      // need to map to the right batch column.
+
+      int retainColumn = bigTableRetainList.get(i);
+      int batchColumnIndex = bigTableValueColumnMap[retainColumn];
+      TypeInfo typeInfo = bigTableValueTypeInfos[i];
+
+      // With this map we project the big table batch to make it look like an output batch.
+      projectionMapping.add(nextOutputColumn, batchColumnIndex, typeInfo);
+
+      // Collect columns we copy from the big table batch to the overflow batch.
+      if (!bigTableRetainedMapping.containsOutputColumn(batchColumnIndex)) {
+        // Tolerate repeated use of a big table column.
+        bigTableRetainedMapping.add(batchColumnIndex, batchColumnIndex, typeInfo);
+      }
+
+      nextOutputColumn++;
+    }
+
+    /*
+     * Now determine the small table results.
+     */
+    boolean smallTableExprVectorizes = true;
+
+    int firstSmallTableOutputColumn;
+    firstSmallTableOutputColumn = (order[0] == posBigTable ? bigTableRetainSize : 0);
+    int smallTableOutputCount = 0;
+    nextOutputColumn = firstSmallTableOutputColumn;
+
+    // Small table indices has more information (i.e. keys) than retain, so use it if it exists...
+    String[] bigTableRetainedNames;
+    if (smallTableIndicesSize > 0) {
+      smallTableOutputCount = smallTableIndicesSize;
+      bigTableRetainedNames = new String[smallTableOutputCount];
+
+      for (int i = 0; i < smallTableIndicesSize; i++) {
+        if (smallTableIndices[i] >= 0) {
+
+          // Zero and above numbers indicate a big table key is needed for
+          // small table result "area".
+
+          int keyIndex = smallTableIndices[i];
+
+          // Since bigTableKeyExpressions may do a calculation and produce a scratch column, we
+          // need to map the right column.
+          int batchKeyColumn = bigTableKeyColumnMap[keyIndex];
+          bigTableRetainedNames[i] = bigTableKeyColumnNames[keyIndex];
+          TypeInfo typeInfo = bigTableKeyTypeInfos[keyIndex];
+
+          if (!isOuterJoin) {
+
+            // Optimize inner join keys of small table results.
+
+            // Project the big table key into the small table result "area".
+            projectionMapping.add(nextOutputColumn, batchKeyColumn, typeInfo);
+
+            if (!bigTableRetainedMapping.containsOutputColumn(batchKeyColumn)) {
+              // If necessary, copy the big table key into the overflow batch's small table
+              // result "area".
+              bigTableRetainedMapping.add(batchKeyColumn, batchKeyColumn, typeInfo);
             }
+          } else {
+
+            // For outer joins, since the small table key can be null when there is no match,
+            // we must have a physical (scratch) column for those keys.  We cannot use the
+            // projection optimization used by inner joins above.
+
+            int scratchColumn = vContext.allocateScratchColumn(typeInfo.getTypeName());
+            projectionMapping.add(nextOutputColumn, scratchColumn, typeInfo);
+
+            bigTableRetainedMapping.add(batchKeyColumn, scratchColumn, typeInfo);
+
+            bigTableOuterKeyMapping.add(batchKeyColumn, scratchColumn, typeInfo);
           }
         } else {
 
-          // With the fast hash table implementation, we currently do not support
-          // Hybrid Grace Hash Join.
+          // Negative numbers indicate a column to be (deserialize) read from the small table's
+          // LazyBinary value row.
+          int smallTableValueIndex = -smallTableIndices[i] - 1;
 
-          if (desc.isHybridHashJoin()) {
-            specialize = false;
+          ExprNodeDesc smallTableExprNode = smallTableExprs.get(i);
+          if (!validateExprNodeDesc(smallTableExprNode, "Small Table")) {
+            clearNotVectorizedReason();
+            smallTableExprVectorizes = false;
           }
+
+          bigTableRetainedNames[i] = smallTableExprNode.toString();
+
+          TypeInfo typeInfo = smallTableExprNode.getTypeInfo();
+
+          // Make a new big table scratch column for the small table value.
+          int scratchColumn = vContext.allocateScratchColumn(typeInfo.getTypeName());
+          projectionMapping.add(nextOutputColumn, scratchColumn, typeInfo);
+
+          smallTableMapping.add(smallTableValueIndex, scratchColumn, typeInfo);
+        }
+        nextOutputColumn++;
+      }
+    } else if (smallTableRetainSize > 0) {
+      smallTableOutputCount = smallTableRetainSize;
+      bigTableRetainedNames = new String[smallTableOutputCount];
+
+      // Only small table values appear in join output result.
+
+      for (int i = 0; i < smallTableRetainSize; i++) {
+        int smallTableValueIndex = smallTableRetainList.get(i);
+
+        ExprNodeDesc smallTableExprNode = smallTableExprs.get(i);
+        if (!validateExprNodeDesc(smallTableExprNode, "Small Table")) {
+          clearNotVectorizedReason();
+          smallTableExprVectorizes = false;
         }
+
+        bigTableRetainedNames[i] = smallTableExprNode.toString();
+
+        // Make a new big table scratch column for the small table value.
+        TypeInfo typeInfo = smallTableExprNode.getTypeInfo();
+        int scratchColumn = vContext.allocateScratchColumn(typeInfo.getTypeName());
+
+        projectionMapping.add(nextOutputColumn, scratchColumn, typeInfo);
+
+        smallTableMapping.add(smallTableValueIndex, scratchColumn, typeInfo);
+        nextOutputColumn++;
       }
+    } else {
+      bigTableRetainedNames = new String[0];
+    }
+
+    // Remember the condition variables for EXPLAIN regardless.
+    vectorDesc.setIsVectorizationMapJoinNativeEnabled(isVectorizationMapJoinNativeEnabled);
+    vectorDesc.setEngine(engine);
+    vectorDesc.setOneMapJoinCondition(oneMapJoinCondition);
+    vectorDesc.setHasNullSafes(hasNullSafes);
+    vectorDesc.setSupportsKeyTypes(supportsKeyTypes);
+    if (!supportsKeyTypes) {
+      vectorDesc.setNotSupportedKeyTypes(new ArrayList(notSupportedKeyTypes));
+    }
+    vectorDesc.setIsEmptyKey(isEmptyKey);
+    vectorDesc.setSmallTableExprVectorizes(smallTableExprVectorizes);
+
+    // Currently, only under Tez and non-N-way joi

<TRUNCATED>

Mime
View raw message