hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmccl...@apache.org
Subject [35/51] [partial] hive git commit: HIVE-17433: Vectorization: Support Decimal64 in Hive Query Engine (Matt McCline, reviewed by Teddy Choi)
Date Sun, 29 Oct 2017 20:40:14 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 28400c7..263d2c7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -35,6 +35,7 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Stack;
+import java.util.TreeSet;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang.ArrayUtils;
@@ -43,8 +44,11 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.*;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
@@ -72,15 +76,24 @@ import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkStringOp
 import org.apache.hadoop.hive.ql.exec.vector.udf.VectorUDFAdaptor;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationDesc;
 import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping;
 import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSourceMapping;
+import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedUDAFs;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.HiveVectorAdaptorUsageMode;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport.Support;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
@@ -119,10 +132,13 @@ import org.apache.hadoop.hive.ql.plan.LimitDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OpTraits;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.Statistics;
 import org.apache.hadoop.hive.ql.plan.VectorAppMasterEventDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
 import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.VectorFilterDesc;
 import org.apache.hadoop.hive.ql.plan.VectorPTFDesc;
@@ -223,10 +239,13 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hive.common.util.AnnotationUtils;
@@ -285,14 +304,38 @@ public class Vectorizer implements PhysicalPlanResolver {
 
   private HiveConf hiveConf;
 
+  public static enum VectorizationEnabledOverride {
+    NONE,
+    DISABLE,
+    ENABLE;
+
+    public final static Map<String,VectorizationEnabledOverride> nameMap =
+        new HashMap<String,VectorizationEnabledOverride>();
+    static {
+      for (VectorizationEnabledOverride vectorizationEnabledOverride : values()) {
+        nameMap.put(
+            vectorizationEnabledOverride.name().toLowerCase(), vectorizationEnabledOverride);
+      }
+    };
+  }
+
+  boolean isVectorizationEnabled;
+  private VectorizationEnabledOverride vectorizationEnabledOverride;
+
   private boolean useVectorizedInputFileFormat;
   private boolean useVectorDeserialize;
   private boolean useRowDeserialize;
   private boolean isReduceVectorizationEnabled;
   private boolean isPtfVectorizationEnabled;
   private boolean isVectorizationComplexTypesEnabled;
+
+  // Now deprecated.
   private boolean isVectorizationGroupByComplexTypesEnabled;
+
   private boolean isVectorizedRowIdentifierEnabled;
+  private String vectorizedInputFormatSupportEnabled;
+  private boolean isLlapIoEnabled;
+  private Set<Support> vectorizedInputFormatSupportEnabledSet;
   private Collection<Class<?>> rowDeserializeInputFormatExcludes;
   private int vectorizedPTFMaxMemoryBufferingBatchCount;
   private int vectorizedTestingReducerBatchSize;
@@ -301,6 +344,11 @@ public class Vectorizer implements PhysicalPlanResolver {
 
   private HiveVectorAdaptorUsageMode hiveVectorAdaptorUsageMode;
 
+  private static final Set<Support> vectorDeserializeTextSupportSet = new TreeSet<Support>();
+  static {
+    vectorDeserializeTextSupportSet.addAll(Arrays.asList(Support.values()));
+  }
+
   private BaseWork currentBaseWork;
   private Operator<? extends OperatorDesc> currentOperator;
   private Collection<Class<?>> vectorizedInputFormatExcludes;
@@ -333,6 +381,9 @@ public class Vectorizer implements PhysicalPlanResolver {
   private Set<VirtualColumn> availableVectorizedVirtualColumnSet = null;
   private Set<VirtualColumn> neededVirtualColumnSet = null;
 
+  public class VectorizerCannotVectorizeException extends Exception {
+  }
+
   public Vectorizer() {
 
     /*
@@ -474,13 +525,16 @@ public class Vectorizer implements PhysicalPlanResolver {
     List<VirtualColumn> neededVirtualColumnList;
     boolean useVectorizedInputFileFormat;
 
-    boolean groupByVectorOutput;
+    Set<Support> inputFormatSupportSet;
+    Set<Support> supportSetInUse;
+    List<String> supportRemovedReasons;
+    List<DataTypePhysicalVariation> allDataTypePhysicalVariations;
+
     boolean allNative;
     boolean usesVectorUDFAdaptor;
 
     String[] scratchTypeNameArray;
-
-    Set<Operator<? extends OperatorDesc>> nonVectorizedOps;
+    DataTypePhysicalVariation[] scratchdataTypePhysicalVariations;
 
     String reduceColumnSortOrder;
     String reduceColumnNullOrder;
@@ -490,7 +544,6 @@ public class Vectorizer implements PhysicalPlanResolver {
     }
 
     public void assume() {
-      groupByVectorOutput = true;
       allNative = true;
       usesVectorUDFAdaptor =  false;
     }
@@ -513,11 +566,20 @@ public class Vectorizer implements PhysicalPlanResolver {
     public void setNeededVirtualColumnList(List<VirtualColumn> neededVirtualColumnList) {
       this.neededVirtualColumnList = neededVirtualColumnList;
     }
+    public void setSupportSetInUse(Set<Support> supportSetInUse) {
+      this.supportSetInUse = supportSetInUse;
+    }
+    public void setSupportRemovedReasons(List<String> supportRemovedReasons) {
+      this.supportRemovedReasons = supportRemovedReasons;
+    }
+    public void setAlldataTypePhysicalVariations(List<DataTypePhysicalVariation> allDataTypePhysicalVariations) {
+      this.allDataTypePhysicalVariations = allDataTypePhysicalVariations;
+    }
     public void setScratchTypeNameArray(String[] scratchTypeNameArray) {
       this.scratchTypeNameArray = scratchTypeNameArray;
     }
-    public void setGroupByVectorOutput(boolean groupByVectorOutput) {
-      this.groupByVectorOutput = groupByVectorOutput;
+    public void setScratchdataTypePhysicalVariationsArray(DataTypePhysicalVariation[] scratchdataTypePhysicalVariations) {
+      this.scratchdataTypePhysicalVariations = scratchdataTypePhysicalVariations;
     }
     public void setAllNative(boolean allNative) {
       this.allNative = allNative;
@@ -528,13 +590,8 @@ public class Vectorizer implements PhysicalPlanResolver {
     public void setUseVectorizedInputFileFormat(boolean useVectorizedInputFileFormat) {
       this.useVectorizedInputFileFormat = useVectorizedInputFileFormat;
     }
-
-    public void setNonVectorizedOps(Set<Operator<? extends OperatorDesc>> nonVectorizedOps) {
-      this.nonVectorizedOps = nonVectorizedOps;
-    }
-
-    public Set<Operator<? extends OperatorDesc>> getNonVectorizedOps() {
-      return nonVectorizedOps;
+    public void setInputFormatSupportSet(Set<Support> inputFormatSupportSet) {
+      this.inputFormatSupportSet = inputFormatSupportSet;
     }
 
     public void setReduceColumnSortOrder(String reduceColumnSortOrder) {
@@ -547,13 +604,12 @@ public class Vectorizer implements PhysicalPlanResolver {
 
     public void transferToBaseWork(BaseWork baseWork) {
 
-      final int virtualColumnCount;
+      final int virtualColumnCount =
+          (availableVirtualColumnList == null ? 0 : availableVirtualColumnList.size());
       VirtualColumn[] neededVirtualColumns;
       if (neededVirtualColumnList != null && neededVirtualColumnList.size() > 0) {
-        virtualColumnCount = neededVirtualColumnList.size();
         neededVirtualColumns = neededVirtualColumnList.toArray(new VirtualColumn[0]);
       } else {
-        virtualColumnCount = 0;
         neededVirtualColumns = new VirtualColumn[0];
       }
 
@@ -566,19 +622,34 @@ public class Vectorizer implements PhysicalPlanResolver {
         dataColumnNumsArray = null;
       }
 
+      DataTypePhysicalVariation[] allDataTypePhysicalVariationArray;
+      if (allDataTypePhysicalVariations == null) {
+        allDataTypePhysicalVariationArray = new DataTypePhysicalVariation[allTypeInfoArray.length];
+        Arrays.fill(allDataTypePhysicalVariationArray, DataTypePhysicalVariation.NONE);
+      } else {
+        allDataTypePhysicalVariationArray =
+            allDataTypePhysicalVariations.toArray(new DataTypePhysicalVariation[0]);
+      }
+
       VectorizedRowBatchCtx vectorizedRowBatchCtx =
           new VectorizedRowBatchCtx(
             allColumnNameArray,
             allTypeInfoArray,
+            allDataTypePhysicalVariationArray,
             dataColumnNumsArray,
             partitionColumnCount,
+            virtualColumnCount,
             neededVirtualColumns,
-            scratchTypeNameArray);
+            scratchTypeNameArray,
+            scratchdataTypePhysicalVariations);
       baseWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx);
 
       if (baseWork instanceof MapWork) {
         MapWork mapWork = (MapWork) baseWork;
         mapWork.setUseVectorizedInputFileFormat(useVectorizedInputFileFormat);
+        mapWork.setInputFormatSupportSet(inputFormatSupportSet);
+        mapWork.setSupportSetInUse(supportSetInUse);
+        mapWork.setSupportRemovedReasons(supportRemovedReasons);
       }
 
       if (baseWork instanceof ReduceWork) {
@@ -588,11 +659,238 @@ public class Vectorizer implements PhysicalPlanResolver {
       }
 
       baseWork.setAllNative(allNative);
-      baseWork.setGroupByVectorOutput(groupByVectorOutput);
       baseWork.setUsesVectorUDFAdaptor(usesVectorUDFAdaptor);
     }
   }
 
+  /*
+   * Used as a dummy root operator to attach vectorized operators that will be built in parallel
+   * to the current non-vectorized operator tree.
+   */
+  private static class DummyRootVectorDesc extends AbstractOperatorDesc {
+
+    public DummyRootVectorDesc() {
+      super();
+    }
+  }
+
+  private static class DummyOperator extends Operator<DummyRootVectorDesc> {
+
+    public DummyOperator() {
+      super(new CompilationOpContext());
+    }
+
+    @Override
+    public void process(Object row, int tag) throws HiveException {
+      throw new RuntimeException("Not used");
+    }
+
+    @Override
+    public String getName() {
+      return "DUMMY";
+    }
+
+    @Override
+    public OperatorType getType() {
+      return null;
+    }
+  }
+
+  private static class DummyVectorOperator extends DummyOperator
+      implements VectorizationOperator {
+
+    private VectorizationContext vContext;
+
+    public DummyVectorOperator(VectorizationContext vContext) {
+      super();
+      this.conf = (DummyRootVectorDesc) new DummyRootVectorDesc();
+      this.vContext = vContext;
+    }
+
+    @Override
+    public VectorizationContext getInputVectorizationContext() {
+      return vContext;
+    }
+
+    @Override
+    public VectorDesc getVectorDesc() {
+      return null;
+    }
+  }
+
+  private List<Operator<? extends OperatorDesc>> newOperatorList() {
+    return new ArrayList<Operator<? extends OperatorDesc>>();
+  }
+
+  private Operator<? extends OperatorDesc> validateAndVectorizeOperatorTree(
+      Operator<? extends OperatorDesc> nonVecRootOperator,
+      boolean isReduce, boolean isTezOrSpark,
+      VectorTaskColumnInfo vectorTaskColumnInfo)
+          throws VectorizerCannotVectorizeException {
+
+    VectorizationContext taskVContext =
+        new VectorizationContext(
+            "Task",
+            vectorTaskColumnInfo.allColumnNames,
+            vectorTaskColumnInfo.allTypeInfos,
+            vectorTaskColumnInfo.allDataTypePhysicalVariations,
+            hiveConf);
+
+    List<Operator<? extends OperatorDesc>> currentParentList = newOperatorList();
+    currentParentList.add(nonVecRootOperator);
+
+    // Start with dummy vector operator as the parent of the parallel vector operator tree we are
+    // creating
+    Operator<? extends OperatorDesc> dummyVectorOperator = new DummyVectorOperator(taskVContext);
+    List<Operator<? extends OperatorDesc>> currentVectorParentList = newOperatorList();
+    currentVectorParentList.add(dummyVectorOperator);
+
+    do {
+      List<Operator<? extends OperatorDesc>> nextParentList = newOperatorList();
+      List<Operator<? extends OperatorDesc>> nextVectorParentList= newOperatorList();
+
+      final int count = currentParentList.size();
+      for (int i = 0; i < count; i++) {
+        Operator<? extends OperatorDesc> parent = currentParentList.get(i);
+
+        List<Operator<? extends OperatorDesc>> childrenList = parent.getChildOperators();
+        if (childrenList == null || childrenList.size() == 0) {
+          continue;
+        }
+
+        Operator<? extends OperatorDesc> vectorParent = currentVectorParentList.get(i);
+
+        /*
+         * Vectorize this parent's children.  Plug them into vectorParent's children list.
+         *
+         * Add those children / vector children to nextParentList / nextVectorParentList.
+         */
+        doProcessChildren(
+            parent, vectorParent, nextParentList, nextVectorParentList,
+            isReduce, isTezOrSpark, vectorTaskColumnInfo);
+
+      }
+      currentParentList = nextParentList;
+      currentVectorParentList = nextVectorParentList;
+    } while (currentParentList.size() > 0);
+
+    return dummyVectorOperator;
+  }
+
+  private void doProcessChildren(
+      Operator<? extends OperatorDesc> parent,
+      Operator<? extends OperatorDesc> vectorParent,
+      List<Operator<? extends OperatorDesc>> nextParentList,
+      List<Operator<? extends OperatorDesc>> nextVectorParentList,
+      boolean isReduce, boolean isTezOrSpark,
+      VectorTaskColumnInfo vectorTaskColumnInfo)
+          throws VectorizerCannotVectorizeException {
+
+    List<Operator<? extends OperatorDesc>> vectorChildren = newOperatorList();
+    List<Operator<? extends OperatorDesc>> children = parent.getChildOperators();
+    List<List<Operator<? extends OperatorDesc>>> listOfChildMultipleParents =
+        new ArrayList<List<Operator<? extends OperatorDesc>>>();
+
+    final int childrenCount = children.size();
+    for (int i = 0; i < childrenCount; i++) {
+
+      Operator<? extends OperatorDesc> child = children.get(i);
+      Operator<? extends OperatorDesc> vectorChild =
+          doProcessChild(
+              child, vectorParent, isReduce, isTezOrSpark, vectorTaskColumnInfo);
+
+      fixupNewVectorChild(
+          parent,
+          vectorParent,
+          child,
+          vectorChild);
+
+      nextParentList.add(child);
+      nextVectorParentList.add(vectorChild);
+    }
+  }
+
+  /*
+   * Fixup the children and parents of a new vector child.
+   *
+   * 1) Add new vector child to the vector parent's children list.
+   *
+   * 2) Copy and fixup the parent list of the original child instead of just assuming a 1:1
+   *    relationship.
+   *
+   *    a) When the child is MapJoinOperator, it will have an extra parent HashTableDummyOperator
+   *       for the MapJoinOperator's small table.  It needs to be fixed up, too.
+   */
+  private void fixupNewVectorChild(
+      Operator<? extends OperatorDesc> parent,
+      Operator<? extends OperatorDesc> vectorParent,
+      Operator<? extends OperatorDesc> child,
+      Operator<? extends OperatorDesc> vectorChild) {
+
+    // 1) Add new vector child to the vector parent's children list.
+    vectorParent.getChildOperators().add(vectorChild);
+
+    // 2) Copy and fixup the parent list of the original child instead of just assuming a 1:1
+    //    relationship.
+    List<Operator<? extends OperatorDesc>> childMultipleParents = newOperatorList();
+    childMultipleParents.addAll(child.getParentOperators());
+    final int childMultipleParentCount = childMultipleParents.size();
+    for (int i = 0; i < childMultipleParentCount; i++) {
+      Operator<? extends OperatorDesc> childMultipleParent = childMultipleParents.get(i);
+      if (childMultipleParent == parent) {
+        childMultipleParents.set(i, vectorParent);
+      } else {
+        fixupOtherParent(childMultipleParent, child, vectorChild);
+      }
+    }
+    vectorChild.setParentOperators(childMultipleParents);
+  }
+
+  private void fixupOtherParent(
+      Operator<? extends OperatorDesc> childMultipleParent,
+      Operator<? extends OperatorDesc> child,
+      Operator<? extends OperatorDesc> vectorChild) {
+
+    List<Operator<? extends OperatorDesc>> children = childMultipleParent.getChildOperators();
+    final int childrenCount = children.size();
+    for (int i = 0; i < childrenCount; i++) {
+      Operator<? extends OperatorDesc> myChild = children.get(i);
+      if (myChild == child) {
+        children.set(i, vectorChild);
+      }
+    }
+  }
+
+  private Operator<? extends OperatorDesc> doProcessChild(
+      Operator<? extends OperatorDesc> child,
+      Operator<? extends OperatorDesc> vectorParent,
+      boolean isReduce, boolean isTezOrSpark,
+      VectorTaskColumnInfo vectorTaskColumnInfo)
+          throws VectorizerCannotVectorizeException {
+
+    // Use vector parent to get VectorizationContext.
+    final VectorizationContext vContext;
+    if (vectorParent instanceof VectorizationContextRegion) {
+      vContext = ((VectorizationContextRegion) vectorParent).getOutputVectorizationContext();
+    } else {
+      vContext = ((VectorizationOperator) vectorParent).getInputVectorizationContext();
+    }
+
+    OperatorDesc desc = child.getConf();
+    Operator<? extends OperatorDesc> vectorChild;
+
+    try {
+      vectorChild =
+          validateAndVectorizeOperator(child, vContext, isReduce, isTezOrSpark, vectorTaskColumnInfo);
+    } catch (HiveException e) {
+      String issue = "exception: " + VectorizationContext.getStackTraceAsSingleLine(e);
+      setNodeIssue(issue);
+      throw new VectorizerCannotVectorizeException();
+    }
+
+    return vectorChild;
+  }
+
   class VectorizationDispatcher implements Dispatcher {
 
     @Override
@@ -659,24 +957,15 @@ public class Vectorizer implements PhysicalPlanResolver {
 
       mapWork.setVectorizedVertexNum(++vectorizedVertexNum);
 
-      boolean ret;
-      try {
-        ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTezOrSpark);
-      } catch (Exception e) {
-        String issue = "exception: " + VectorizationContext.getStackTraceAsSingleLine(e);
-        setNodeIssue(issue);
-        ret = false;
-      }
-      if (ret) {
-        vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTezOrSpark);
-      } else if (currentBaseWork.getVectorizationEnabled()) {
-        VectorizerReason notVectorizedReason  = currentBaseWork.getNotVectorizedReason();
-        if (notVectorizedReason == null) {
-          LOG.info("Cannot vectorize: unknown");
-        } else {
-          LOG.info("Cannot vectorize: " + notVectorizedReason.toString());
+      if (!validateAndVectorizeMapWork(mapWork, vectorTaskColumnInfo, isTezOrSpark)) {
+        if (currentBaseWork.getVectorizationEnabled()) {
+          VectorizerReason notVectorizedReason  = currentBaseWork.getNotVectorizedReason();
+          if (notVectorizedReason == null) {
+            LOG.info("Cannot vectorize: unknown");
+          } else {
+            LOG.info("Cannot vectorize: " + notVectorizedReason.toString());
+          }
         }
-        clearMapWorkVectorDescs(mapWork);
       }
     }
 
@@ -737,10 +1026,7 @@ public class Vectorizer implements PhysicalPlanResolver {
         // Turns out partition columns get marked as virtual in ColumnInfo, so we need to
         // check the VirtualColumn directly.
         VirtualColumn virtualColumn = VirtualColumn.VIRTUAL_COLUMN_NAME_MAP.get(columnName);
-        if (virtualColumn == null) {
-          logicalColumnNameList.add(columnName);
-          logicalTypeInfoList.add(TypeInfoUtils.getTypeInfoFromTypeString(c.getTypeName()));
-        } else {
+        if (virtualColumn != null) {
 
           // The planner gives us a subset virtual columns available for this table scan.
           //    AND
@@ -759,6 +1045,10 @@ public class Vectorizer implements PhysicalPlanResolver {
           }
           availableVirtualColumnList.add(virtualColumn);
         }
+
+        // All columns: data, partition, and virtual are added.
+        logicalColumnNameList.add(columnName);
+        logicalTypeInfoList.add(TypeInfoUtils.getTypeInfoFromTypeString(c.getTypeName()));
       }
     }
 
@@ -778,6 +1068,45 @@ public class Vectorizer implements PhysicalPlanResolver {
       }
     }
 
+    private Support[] getVectorizedInputFormatSupports(
+      Class<? extends InputFormat> inputFileFormatClass) {
+
+      // FUTURE: Decide how to ask an input file format what vectorization features it supports.
+      return null;
+    }
+
+    /*
+     * Add the support of the VectorizedInputFileFormatInterface.
+     */
+    private void addVectorizedInputFileFormatSupport(
+        Set<Support> newSupportSet,
+        boolean isInputFileFormatVectorized, Class<? extends InputFormat>inputFileFormatClass) {
+
+      final Support[] supports;
+      if (isInputFileFormatVectorized) {
+        supports = getVectorizedInputFormatSupports(inputFileFormatClass);
+      } else {
+        supports = null;
+      }
+      if (supports == null) {
+        // No support.
+      } else {
+        for (Support support : supports) {
+          newSupportSet.add(support);
+        }
+      }
+    }
+
+    private void handleSupport(
+        boolean isFirstPartition, Set<Support> inputFormatSupportSet, Set<Support> newSupportSet) {
+      if (isFirstPartition) {
+        inputFormatSupportSet.addAll(newSupportSet);
+      } else if (!newSupportSet.equals(inputFormatSupportSet)){
+        // Do the intersection so only support in both is kept.
+        inputFormatSupportSet.retainAll(newSupportSet);
+      }
+    }
+
     /*
      * There are 3 modes of reading for vectorization:
      *
@@ -792,11 +1121,14 @@ public class Vectorizer implements PhysicalPlanResolver {
      *      the row object into the VectorizedRowBatch with VectorAssignRow.
      *      This picks up Input File Format not supported by the other two.
      */
-    private boolean verifyAndSetVectorPartDesc(PartitionDesc pd, boolean isAcidTable,
-        HashSet<String> inputFileFormatClassNameSet, HashSet<String> enabledConditionsMetSet,
-        ArrayList<String> enabledConditionsNotMetList) {
+    private boolean verifyAndSetVectorPartDesc(
+        PartitionDesc pd, boolean isAcidTable,
+        Set<String> inputFileFormatClassNameSet,
+        Set<String> enabledConditionsMetSet, ArrayList<String> enabledConditionsNotMetList,
+        Set<Support> newSupportSet) {
 
-      String inputFileFormatClassName = pd.getInputFileFormatClassName();
+      Class<? extends InputFormat> inputFileFormatClass = pd.getInputFileFormatClass();
+      String inputFileFormatClassName = inputFileFormatClass.getName();
 
       // Always collect input file formats.
       inputFileFormatClassNameSet.add(inputFileFormatClassName);
@@ -816,6 +1148,9 @@ public class Vectorizer implements PhysicalPlanResolver {
           return false;
         }
 
+        addVectorizedInputFileFormatSupport(
+            newSupportSet, isInputFileFormatVectorized, inputFileFormatClass);
+
         pd.setVectorPartitionDesc(
             VectorPartitionDesc.createVectorizedInputFileFormat(
                 inputFileFormatClassName, Utilities.isInputFileFormatSelfDescribing(pd)));
@@ -831,12 +1166,16 @@ public class Vectorizer implements PhysicalPlanResolver {
 
         if (isInputFileFormatVectorized && !isInputFormatExcluded(inputFileFormatClassName,
             vectorizedInputFormatExcludes)) {
-          pd.setVectorPartitionDesc(VectorPartitionDesc
-              .createVectorizedInputFileFormat(inputFileFormatClassName,
-                  Utilities.isInputFileFormatSelfDescribing(pd)));
 
-          enabledConditionsMetSet
-              .add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname);
+          addVectorizedInputFileFormatSupport(
+              newSupportSet, isInputFileFormatVectorized, inputFileFormatClass);
+
+          pd.setVectorPartitionDesc(
+              VectorPartitionDesc.createVectorizedInputFileFormat(
+                  inputFileFormatClassName, Utilities.isInputFileFormatSelfDescribing(pd)));
+
+          enabledConditionsMetSet.add(
+              HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname);
           return true;
         }
         // Fall through and look for other options...
@@ -893,6 +1232,10 @@ public class Vectorizer implements PhysicalPlanResolver {
               return false;
             }
           } else {
+
+            // Add the support for read variations in Vectorized Text.
+            newSupportSet.addAll(vectorDeserializeTextSupportSet);
+
             pd.setVectorPartitionDesc(
                 VectorPartitionDesc.createVectorDeserialize(
                     inputFileFormatClassName, VectorDeserializeType.LAZY_SIMPLE));
@@ -979,19 +1322,20 @@ public class Vectorizer implements PhysicalPlanResolver {
       boolean isAcidTable = tableScanOperator.getConf().isAcidTable();
 
       // These names/types are the data columns plus partition columns.
-      final List<String> dataAndPartColumnNameList = new ArrayList<String>();
-      final List<TypeInfo> dataAndPartTypeInfoList = new ArrayList<TypeInfo>();
+      final List<String> allColumnNameList = new ArrayList<String>();
+      final List<TypeInfo> allTypeInfoList = new ArrayList<TypeInfo>();
 
       final List<VirtualColumn> availableVirtualColumnList = new ArrayList<VirtualColumn>();
 
       getTableScanOperatorSchemaInfo(
           tableScanOperator,
-          dataAndPartColumnNameList, dataAndPartTypeInfoList,
+          allColumnNameList, allTypeInfoList,
           availableVirtualColumnList);
+      final int virtualColumnCount = availableVirtualColumnList.size();
 
       final List<Integer> dataColumnNums = new ArrayList<Integer>();
 
-      final int dataAndPartColumnCount = dataAndPartColumnNameList.size();
+      final int dataAndPartColumnCount = allColumnNameList.size() - virtualColumnCount;
 
       /*
        * Validate input formats of all the partitions can be vectorized.
@@ -1007,11 +1351,16 @@ public class Vectorizer implements PhysicalPlanResolver {
       LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo();
 
       // Remember the input file formats we validated and why.
-      HashSet<String> inputFileFormatClassNameSet = new HashSet<String>();
-      HashSet<String> enabledConditionsMetSet = new HashSet<String>();
+      Set<String> inputFileFormatClassNameSet = new HashSet<String>();
+      Set<String> enabledConditionsMetSet = new HashSet<String>();
       ArrayList<String> enabledConditionsNotMetList = new ArrayList<String>();
+      Set<Support> inputFormatSupportSet = new TreeSet<Support>();
+      boolean outsideLoopIsFirstPartition = true;
 
       for (Entry<Path, ArrayList<String>> entry: pathToAliases.entrySet()) {
+        final boolean isFirstPartition = outsideLoopIsFirstPartition;
+        outsideLoopIsFirstPartition = false;
+
         Path path = entry.getKey();
         List<String> aliases = entry.getValue();
         boolean isPresent = (aliases != null && aliases.indexOf(alias) != -1);
@@ -1025,8 +1374,12 @@ public class Vectorizer implements PhysicalPlanResolver {
           // We've seen this already.
           continue;
         }
-        if (!verifyAndSetVectorPartDesc(partDesc, isAcidTable, inputFileFormatClassNameSet,
-            enabledConditionsMetSet, enabledConditionsNotMetList)) {
+        Set<Support> newSupportSet = new TreeSet<Support>();
+        if (!verifyAndSetVectorPartDesc(
+            partDesc, isAcidTable,
+            inputFileFormatClassNameSet,
+            enabledConditionsMetSet, enabledConditionsNotMetList,
+            newSupportSet)) {
 
           // Always set these so EXPLAIN can see.
           mapWork.setVectorizationInputFileFormatClassNameSet(inputFileFormatClassNameSet);
@@ -1039,6 +1392,8 @@ public class Vectorizer implements PhysicalPlanResolver {
           return new ImmutablePair<Boolean,Boolean>(false, true);
         }
 
+        handleSupport(isFirstPartition, inputFormatSupportSet, newSupportSet);
+
         VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
 
         if (isFirst) {
@@ -1054,11 +1409,11 @@ public class Vectorizer implements PhysicalPlanResolver {
             dataColumnCount = dataAndPartColumnCount;
           }
 
-          determineDataColumnNums(tableScanOperator, dataAndPartColumnNameList, dataColumnCount,
+          determineDataColumnNums(tableScanOperator, allColumnNameList, dataColumnCount,
               dataColumnNums);
 
-          tableDataColumnList = dataAndPartColumnNameList.subList(0, dataColumnCount);
-          tableDataTypeInfoList = dataAndPartTypeInfoList.subList(0, dataColumnCount);
+          tableDataColumnList = allColumnNameList.subList(0, dataColumnCount);
+          tableDataTypeInfoList = allTypeInfoList.subList(0, dataColumnCount);
 
           isFirst = false;
         }
@@ -1132,14 +1487,16 @@ public class Vectorizer implements PhysicalPlanResolver {
 
       // For now, we don't know which virtual columns are going to be included.  We'll add them
       // later...
-      vectorTaskColumnInfo.setAllColumnNames(dataAndPartColumnNameList);
-      vectorTaskColumnInfo.setAllTypeInfos(dataAndPartTypeInfoList);
+      vectorTaskColumnInfo.setAllColumnNames(allColumnNameList);
+      vectorTaskColumnInfo.setAllTypeInfos(allTypeInfoList);
 
       vectorTaskColumnInfo.setDataColumnNums(dataColumnNums);
       vectorTaskColumnInfo.setPartitionColumnCount(partitionColumnCount);
       vectorTaskColumnInfo.setAvailableVirtualColumnList(availableVirtualColumnList);
       vectorTaskColumnInfo.setUseVectorizedInputFileFormat(useVectorizedInputFileFormat);
 
+      vectorTaskColumnInfo.setInputFormatSupportSet(inputFormatSupportSet);
+
       // Always set these so EXPLAIN can see.
       mapWork.setVectorizationInputFileFormatClassNameSet(inputFileFormatClassNameSet);
       mapWork.setVectorizationEnabledConditionsMet(new ArrayList(enabledConditionsMetSet));
@@ -1148,10 +1505,12 @@ public class Vectorizer implements PhysicalPlanResolver {
       return new ImmutablePair<Boolean,Boolean>(true, false);
     }
 
-    private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTezOrSpark)
-            throws SemanticException {
+    private boolean validateAndVectorizeMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo,
+        boolean isTezOrSpark) throws SemanticException {
 
-      LOG.info("Validating MapWork...");
+      //--------------------------------------------------------------------------------------------
+
+      LOG.info("Examining input format to see if vectorization is enabled.");
 
       ImmutablePair<String,TableScanOperator> onlyOneTableScanPair = verifyOnlyOneTableScanOperator(mapWork);
       if (onlyOneTableScanPair ==  null) {
@@ -1178,6 +1537,66 @@ public class Vectorizer implements PhysicalPlanResolver {
         return false;
       }
 
+      final int dataColumnCount =
+          vectorTaskColumnInfo.allColumnNames.size() - vectorTaskColumnInfo.partitionColumnCount;
+
+      /*
+       * Take what all input formats support and eliminate any of them not enabled by
+       * the Hive variable.
+       */
+      List<String> supportRemovedReasons = new ArrayList<String>();
+      Set<Support> supportSet = new TreeSet<Support>();
+      if (vectorTaskColumnInfo.inputFormatSupportSet != null) {
+        supportSet.addAll(vectorTaskColumnInfo.inputFormatSupportSet);
+      }
+      // The retainAll method does set intersection.
+      supportSet.retainAll(vectorizedInputFormatSupportEnabledSet);
+      if (!supportSet.equals(vectorTaskColumnInfo.inputFormatSupportSet)) {
+
+        Set<Support> removedSet = new TreeSet<Support>();
+        removedSet.addAll(vectorizedInputFormatSupportEnabledSet);
+        removedSet.removeAll(supportSet);
+        String removeString =
+            removedSet.toString() + " is disabled because it is not in " +
+            HiveConf.ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED.varname +
+            " " + vectorizedInputFormatSupportEnabledSet.toString();
+        supportRemovedReasons.add(removeString);
+      }
+
+      // And, if LLAP is enabled for now, disable DECIMAL_64;
+      if (isLlapIoEnabled && supportSet.contains(Support.DECIMAL_64)) {
+        supportSet.remove(Support.DECIMAL_64);
+        String removeString =
+            "DECIMAL_64 disabled because LLAP is enabled";
+        supportRemovedReasons.add(removeString);
+      }
+
+      // Now rememember what is supported for this query and any support that was
+      // removed.
+      vectorTaskColumnInfo.setSupportSetInUse(supportSet);
+      vectorTaskColumnInfo.setSupportRemovedReasons(supportRemovedReasons);
+
+      final boolean isSupportDecimal64 = supportSet.contains(Support.DECIMAL_64);
+      List<DataTypePhysicalVariation> dataTypePhysicalVariations = new ArrayList<DataTypePhysicalVariation>();
+      for (int i = 0; i < dataColumnCount; i++) {
+        DataTypePhysicalVariation dataTypePhysicalVariation = DataTypePhysicalVariation.NONE;
+        if (isSupportDecimal64) {
+          TypeInfo typeInfo = vectorTaskColumnInfo.allTypeInfos.get(i);
+          if (typeInfo instanceof DecimalTypeInfo) {
+            DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
+            if (HiveDecimalWritable.isPrecisionDecimal64(decimalTypeInfo.precision())) {
+              dataTypePhysicalVariation = DataTypePhysicalVariation.DECIMAL_64;
+            }
+          }
+        }
+        dataTypePhysicalVariations.add(dataTypePhysicalVariation);
+      }
+      // It simplifies things to just add default ones for partitions.
+      for (int i = 0; i < vectorTaskColumnInfo.partitionColumnCount; i++) {
+        dataTypePhysicalVariations.add(DataTypePhysicalVariation.NONE);
+      }
+      vectorTaskColumnInfo.setAlldataTypePhysicalVariations(dataTypePhysicalVariations);
+
       // Set global member indicating which virtual columns are possible to be used by
       // the Map vertex.
       availableVectorizedVirtualColumnSet = new HashSet<VirtualColumn>();
@@ -1186,79 +1605,162 @@ public class Vectorizer implements PhysicalPlanResolver {
       // And, use set to remember which virtual columns were actually referenced.
       neededVirtualColumnSet = new HashSet<VirtualColumn>();
 
-      // Now we are enabled and any issues found from here on out are considered
-      // not vectorized issues.
       mapWork.setVectorizationEnabled(true);
+      LOG.info("Vectorization is enabled for input format(s) " + mapWork.getVectorizationInputFileFormatClassNameSet().toString());
 
-      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTezOrSpark);
-      addMapWorkRules(opRules, vnp);
-      Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
-      GraphWalker ogw = new DefaultGraphWalker(disp);
-
-      // iterator the mapper operator tree
-      ArrayList<Node> topNodes = new ArrayList<Node>();
-      topNodes.addAll(mapWork.getAliasToWork().values());
-      HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
-      ogw.startWalking(topNodes, nodeOutput);
-      for (Node n : nodeOutput.keySet()) {
-        if (nodeOutput.get(n) != null) {
-          if (!((Boolean)nodeOutput.get(n)).booleanValue()) {
-            return false;
-          }
-        }
+      //--------------------------------------------------------------------------------------------
+
+      /*
+       * Validate and vectorize the Map operator tree.
+       */
+      if (!validateAndVectorizeMapOperators(mapWork, tableScanOperator, isTezOrSpark, vectorTaskColumnInfo)) {
+        return false;
       }
 
-      List<VirtualColumn> neededVirtualColumnList = new ArrayList<VirtualColumn>();
-      if (!neededVirtualColumnSet.isEmpty()) {
+      //--------------------------------------------------------------------------------------------
 
-        // Create needed in same order.
-        for (VirtualColumn virtualColumn : vectorTaskColumnInfo.availableVirtualColumnList) {
-          if (neededVirtualColumnSet.contains(virtualColumn)) {
-            neededVirtualColumnList.add(virtualColumn);
-            vectorTaskColumnInfo.allColumnNames.add(virtualColumn.getName());
-            vectorTaskColumnInfo.allTypeInfos.add(virtualColumn.getTypeInfo());
-          }
-        }
+      vectorTaskColumnInfo.transferToBaseWork(mapWork);
+
+      mapWork.setVectorMode(true);
+
+      if (LOG.isDebugEnabled()) {
+        debugDisplayVertexInfo(mapWork);
       }
 
-      vectorTaskColumnInfo.setNeededVirtualColumnList(neededVirtualColumnList);
-      vectorTaskColumnInfo.setNonVectorizedOps(vnp.getNonVectorizedOps());
       return true;
     }
 
-    private void vectorizeMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo,
-            boolean isTezOrSpark) throws SemanticException {
+    private boolean validateAndVectorizeMapOperators(MapWork mapWork, TableScanOperator tableScanOperator,
+        boolean isTezOrSpark, VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException {
 
-      LOG.info("Vectorizing MapWork...");
-      mapWork.setVectorMode(true);
-      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      MapWorkVectorizationNodeProcessor vnp =
-          new MapWorkVectorizationNodeProcessor(mapWork, isTezOrSpark, vectorTaskColumnInfo);
-      addMapWorkRules(opRules, vnp);
-      Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
-      GraphWalker ogw = new PreOrderOnceWalker(disp);
-      // iterator the mapper operator tree
-      ArrayList<Node> topNodes = new ArrayList<Node>();
-      topNodes.addAll(mapWork.getAliasToWork().values());
-      HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
-      ogw.startWalking(topNodes, nodeOutput);
-
-      for (Node topNode : topNodes) {
-        if (topNode instanceof TableScanOperator) {
-          ((TableScanOperator) topNode).getConf().setVectorized(true);
+      LOG.info("Validating and vectorizing MapWork...");
+
+      // Set "global" member indicating where to store "not vectorized" information if necessary.
+      currentBaseWork = mapWork;
+
+      try {
+        validateAndVectorizeMapOperators(tableScanOperator, isTezOrSpark, vectorTaskColumnInfo);
+      } catch (VectorizerCannotVectorizeException e) {
+
+        // The "not vectorized" information has been stored in the MapWork vertex.
+        return false;
+      }
+
+      vectorTaskColumnInfo.setNeededVirtualColumnList(
+          new ArrayList<VirtualColumn>(neededVirtualColumnSet));
+
+      /*
+       * The scratch column information was collected by the task VectorizationContext.  Go get it.
+       */
+      VectorizationContext vContext =
+          ((VectorizationContextRegion) tableScanOperator).getOutputVectorizationContext();
+
+      vectorTaskColumnInfo.setScratchTypeNameArray(
+          vContext.getScratchColumnTypeNames());
+      vectorTaskColumnInfo.setScratchdataTypePhysicalVariationsArray(
+          vContext.getScratchDataTypePhysicalVariations());
+
+      return true;
+    }
+
+    private void validateAndVectorizeMapOperators(TableScanOperator tableScanOperator,
+        boolean isTezOrSpark, VectorTaskColumnInfo vectorTaskColumnInfo)
+            throws VectorizerCannotVectorizeException {
+
+      Operator<? extends OperatorDesc> dummyVectorOperator =
+          validateAndVectorizeOperatorTree(tableScanOperator, false, isTezOrSpark, vectorTaskColumnInfo);
+
+      // Fixup parent and child relations.
+      List<Operator<? extends OperatorDesc>> vectorChildren = dummyVectorOperator.getChildOperators();
+      tableScanOperator.setChildOperators(vectorChildren);
+
+      final int vectorChildCount = vectorChildren.size();
+      for (int i = 0; i < vectorChildCount; i++) {
+
+        Operator<? extends OperatorDesc> vectorChild = vectorChildren.get(i);
+
+        // Replace any occurrence of dummyVectorOperator with our TableScanOperator.
+        List<Operator<? extends OperatorDesc>> vectorChildParents = vectorChild.getParentOperators();
+        final int vectorChildParentCount = vectorChildParents.size();
+        for (int p = 0; p < vectorChildParentCount; p++) {
+          Operator<? extends OperatorDesc> vectorChildParent = vectorChildParents.get(p);
+          if (vectorChildParent == dummyVectorOperator) {
+            vectorChildParents.set(p, tableScanOperator);
+          }
         }
       }
 
-      vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames());
+      // And, finally, save the VectorizationContext.
+      tableScanOperator.setTaskVectorizationContext(
+          ((VectorizationOperator) dummyVectorOperator).getInputVectorizationContext());
 
-      vectorTaskColumnInfo.transferToBaseWork(mapWork);
+      // Modify TableScanOperator in-place so it knows to operate vectorized.
+      vectorizeTableScanOperatorInPlace(tableScanOperator, vectorTaskColumnInfo);
+    }
 
-      if (LOG.isDebugEnabled()) {
-        debugDisplayAllMaps(mapWork);
+    /*
+     * We are "committing" this vertex to be vectorized.
+     */
+    private void vectorizeTableScanOperatorInPlace(TableScanOperator tableScanOperator,
+        VectorTaskColumnInfo vectorTaskColumnInfo) {
+
+      TableScanDesc tableScanDesc = (TableScanDesc) tableScanOperator.getConf();
+      VectorTableScanDesc vectorTableScanDesc = new VectorTableScanDesc();
+      tableScanDesc.setVectorDesc(vectorTableScanDesc);
+
+      VectorizationContext vContext =
+          ((VectorizationContextRegion) tableScanOperator).getOutputVectorizationContext();
+      List<Integer> projectedColumns = vContext.getProjectedColumns();
+      vectorTableScanDesc.setProjectedColumns(
+          ArrayUtils.toPrimitive(projectedColumns.toArray(new Integer[0])));
+      List<String> allColumnNameList = vectorTaskColumnInfo.allColumnNames;
+      List<TypeInfo> allTypeInfoList = vectorTaskColumnInfo.allTypeInfos;
+      List<DataTypePhysicalVariation> allDataTypePhysicalVariationList = vectorTaskColumnInfo.allDataTypePhysicalVariations;
+      final int projectedColumnCount = projectedColumns.size();
+      String[] projectedDataColumnNames = new String[projectedColumnCount];
+      TypeInfo[] projectedDataColumnTypeInfos = new TypeInfo[projectedColumnCount];
+      DataTypePhysicalVariation[] projectedDataColumnDataTypePhysicalVariation =
+          new DataTypePhysicalVariation[projectedColumnCount];
+      for (int i = 0; i < projectedColumnCount; i++) {
+        final int projectedColumnNum = projectedColumns.get(i);
+        projectedDataColumnNames[i] = allColumnNameList.get(projectedColumnNum);
+        projectedDataColumnTypeInfos[i] = allTypeInfoList.get(projectedColumnNum);
+        projectedDataColumnDataTypePhysicalVariation[i] = allDataTypePhysicalVariationList.get(projectedColumnNum);
+      }
+      vectorTableScanDesc.setProjectedColumnNames(projectedDataColumnNames);
+      vectorTableScanDesc.setProjectedColumnTypeInfos(projectedDataColumnTypeInfos);
+      vectorTableScanDesc.setProjectedColumnDataTypePhysicalVariations(projectedDataColumnDataTypePhysicalVariation);
+
+      tableScanOperator.getConf().setVectorized(true);
+
+      List<Operator<? extends OperatorDesc>> children = tableScanOperator.getChildOperators();
+      while (children.size() > 0) {
+        children = dosetVectorDesc(children);
+      }
+    }
+
+    private List<Operator<? extends OperatorDesc>> dosetVectorDesc(
+        List<Operator<? extends OperatorDesc>> children) {
+
+      List<Operator<? extends OperatorDesc>> newChildren =
+          new ArrayList<Operator<? extends OperatorDesc>>();
+
+      for (Operator<? extends OperatorDesc> child : children) {
+
+        // Get the vector description from the operator.
+        VectorDesc vectorDesc = ((VectorizationOperator) child).getVectorDesc();
+
+        // Save the vector description for the EXPLAIN.
+        AbstractOperatorDesc desc = (AbstractOperatorDesc) child.getConf();
+        desc.setVectorDesc(vectorDesc);
+
+        List<Operator<? extends OperatorDesc>> childChildren = child.getChildOperators();
+        if (childChildren != null) {
+          newChildren.addAll(childChildren);
+        }
       }
 
-      return;
+      return newChildren;
     }
 
     private void setReduceWorkExplainConditions(ReduceWork reduceWork) {
@@ -1282,25 +1784,105 @@ public class Vectorizer implements PhysicalPlanResolver {
       reduceWork.setVectorizedVertexNum(++vectorizedVertexNum);
       reduceWork.setVectorizedTestingReducerBatchSize(vectorizedTestingReducerBatchSize);
 
-      boolean ret;
+      if (!validateAndVectorizeReduceWork(reduceWork, vectorTaskColumnInfo)) {
+        if (currentBaseWork.getVectorizationEnabled()) {
+          VectorizerReason notVectorizedReason  = currentBaseWork.getNotVectorizedReason();
+          if (notVectorizedReason == null) {
+            LOG.info("Cannot vectorize: unknown");
+          } else {
+            LOG.info("Cannot vectorize: " + notVectorizedReason.toString());
+          }
+        }
+      }
+    }
+
+    private boolean validateAndVectorizeReduceWork(ReduceWork reduceWork,
+        VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException {
+
+      Operator<? extends OperatorDesc> reducer = reduceWork.getReducer();
+
+      // Validate input to ReduceWork.
+      if (!getOnlyStructObjectInspectors(reduceWork, vectorTaskColumnInfo)) {
+        return false;
+      }
+
+      //--------------------------------------------------------------------------------------------
+
+      /*
+       * Validate and vectorize the Reduce operator tree.
+       */
+      if (!validateAndVectorizeReduceOperators(reduceWork, vectorTaskColumnInfo)) {
+        return false;
+      }
+
+      //--------------------------------------------------------------------------------------------
+
+      vectorTaskColumnInfo.transferToBaseWork(reduceWork);
+
+      reduceWork.setVectorMode(true);
+
+      if (LOG.isDebugEnabled()) {
+        debugDisplayVertexInfo(reduceWork);
+      }
+
+      return true;
+    }
+
+    private boolean validateAndVectorizeReduceOperators(ReduceWork reduceWork,
+        VectorTaskColumnInfo vectorTaskColumnInfo)
+            throws SemanticException {
+
+      LOG.info("Validating and vectorizing ReduceWork...");
+
+      Operator<? extends OperatorDesc> newVectorReducer;
       try {
-        ret = validateReduceWork(reduceWork, vectorTaskColumnInfo);
-      } catch (Exception e) {
-        String issue = "exception: " + VectorizationContext.getStackTraceAsSingleLine(e);
-        setNodeIssue(issue);
-        ret = false;
+        newVectorReducer =
+            validateAndVectorizeReduceOperators(reduceWork.getReducer(), vectorTaskColumnInfo);
+      } catch (VectorizerCannotVectorizeException e) {
+
+        // The "not vectorized" information has been stored in the MapWork vertex.
+        return false;
       }
-      if (ret) {
-        vectorizeReduceWork(reduceWork, vectorTaskColumnInfo);
-      } else if (currentBaseWork.getVectorizationEnabled()) {
-        VectorizerReason notVectorizedReason  = currentBaseWork.getNotVectorizedReason();
-        if (notVectorizedReason == null) {
-          LOG.info("Cannot vectorize: unknown");
-        } else {
-          LOG.info("Cannot vectorize: " + notVectorizedReason.toString());
-        }
-        clearReduceWorkVectorDescs(reduceWork);
+
+      /*
+       * The scratch column information was collected by the task VectorizationContext.  Go get it.
+       */
+      VectorizationContext vContext =
+          ((VectorizationOperator) newVectorReducer).getInputVectorizationContext();
+
+      vectorTaskColumnInfo.setScratchTypeNameArray(
+          vContext.getScratchColumnTypeNames());
+      vectorTaskColumnInfo.setScratchdataTypePhysicalVariationsArray(
+          vContext.getScratchDataTypePhysicalVariations());
+
+      // Replace the reducer with our fully vectorized reduce operator tree.
+      reduceWork.setReducer(newVectorReducer);
+
+      return true;
+    }
+
+    private Operator<? extends OperatorDesc> validateAndVectorizeReduceOperators(
+        Operator<? extends OperatorDesc> reducerOperator,
+        VectorTaskColumnInfo vectorTaskColumnInfo)
+            throws VectorizerCannotVectorizeException {
+
+      Operator<? extends OperatorDesc> dummyOperator = new DummyOperator();
+      dummyOperator.getChildOperators().add(reducerOperator);
+
+      Operator<? extends OperatorDesc> dummyVectorOperator =
+          validateAndVectorizeOperatorTree(dummyOperator, true, true, vectorTaskColumnInfo);
+
+      Operator<? extends OperatorDesc> newVectorReducer =
+          dummyVectorOperator.getChildOperators().get(0);
+
+      List<Operator<? extends OperatorDesc>> children =
+          new ArrayList<Operator<? extends OperatorDesc>>();
+      children.add(newVectorReducer);
+      while (children.size() > 0) {
+        children = dosetVectorDesc(children);
       }
+
+      return newVectorReducer;
     }
 
     private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork,
@@ -1375,466 +1957,41 @@ public class Vectorizer implements PhysicalPlanResolver {
 
       vectorTaskColumnInfo.setReduceColumnSortOrder(columnSortOrder);
       vectorTaskColumnInfo.setReduceColumnNullOrder(columnNullOrder);
-      
+
       return true;
     }
-
-    private void addReduceWorkRules(Map<Rule, NodeProcessor> opRules, NodeProcessor np) {
-      opRules.put(new RuleRegExp("R1", GroupByOperator.getOperatorName() + ".*"), np);
-      opRules.put(new RuleRegExp("R2", SelectOperator.getOperatorName() + ".*"), np);
-    }
-
-    private boolean validateReduceWork(ReduceWork reduceWork,
-        VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException {
-
-      LOG.info("Validating ReduceWork...");
-
-      // Validate input to ReduceWork.
-      if (!getOnlyStructObjectInspectors(reduceWork, vectorTaskColumnInfo)) {
-        return false;
-      }
-      // Now check the reduce operator tree.
-      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      ReduceWorkValidationNodeProcessor vnp =
-          new ReduceWorkValidationNodeProcessor(vectorTaskColumnInfo);
-      addReduceWorkRules(opRules, vnp);
-      Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
-      GraphWalker ogw = new DefaultGraphWalker(disp);
-      // iterator the reduce operator tree
-      ArrayList<Node> topNodes = new ArrayList<Node>();
-      topNodes.add(reduceWork.getReducer());
-      HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
-      ogw.startWalking(topNodes, nodeOutput);
-      for (Node n : nodeOutput.keySet()) {
-        if (nodeOutput.get(n) != null) {
-          if (!((Boolean)nodeOutput.get(n)).booleanValue()) {
-            return false;
-          }
-        }
-      }
-      vectorTaskColumnInfo.setNonVectorizedOps(vnp.getNonVectorizedOps());
-      return true;
-    }
-
-    private void vectorizeReduceWork(ReduceWork reduceWork,
-        VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException {
-
-      LOG.info("Vectorizing ReduceWork...");
-      reduceWork.setVectorMode(true);
-
-      // For some reason, the DefaultGraphWalker does not descend down from the reducer Operator as
-      // expected.  We need to descend down, otherwise it breaks our algorithm that determines
-      // VectorizationContext...  Do we use PreOrderWalker instead of DefaultGraphWalker.
-      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      ReduceWorkVectorizationNodeProcessor vnp =
-              new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo);
-      addReduceWorkRules(opRules, vnp);
-      Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
-      GraphWalker ogw = new PreOrderWalker(disp);
-      // iterator the reduce operator tree
-      ArrayList<Node> topNodes = new ArrayList<Node>();
-      topNodes.add(reduceWork.getReducer());
-      LOG.info("vectorizeReduceWork reducer Operator: " +
-              reduceWork.getReducer().getName() + "...");
-      HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
-      ogw.startWalking(topNodes, nodeOutput);
-
-      // Necessary since we are vectorizing the root operator in reduce.
-      reduceWork.setReducer(vnp.getRootVectorOp());
-
-      vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames());
-
-      vectorTaskColumnInfo.transferToBaseWork(reduceWork);
-
-      if (LOG.isDebugEnabled()) {
-        debugDisplayAllMaps(reduceWork);
-      }
-    }
-
-    class ClearVectorDescsNodeProcessor implements NodeProcessor {
-
-      public ClearVectorDescsNodeProcessor() {
-      }
-
-      @Override
-      public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-          Object... nodeOutputs) throws SemanticException {
-        for (Node n : stack) {
-          Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
-
-          OperatorDesc desc = op.getConf();
-          if (desc instanceof AbstractOperatorDesc) {
-            AbstractOperatorDesc abstractDesc = (AbstractOperatorDesc) desc;
-            abstractDesc.setVectorDesc(null);
-          }
-        }
-        return null;
-      }
-    }
-
-    private void clearMapWorkVectorDescs(MapWork mapWork) throws SemanticException {
-      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      ClearVectorDescsNodeProcessor vnp = new ClearVectorDescsNodeProcessor();
-      addMapWorkRules(opRules, vnp);
-      Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
-      GraphWalker ogw = new DefaultGraphWalker(disp);
-      ArrayList<Node> topNodes = new ArrayList<Node>();
-      topNodes.addAll(mapWork.getAliasToWork().values());
-      ogw.startWalking(topNodes, null);
-    }
-
-    private void clearReduceWorkVectorDescs(ReduceWork reduceWork) throws SemanticException {
-      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      ClearVectorDescsNodeProcessor vnp = new ClearVectorDescsNodeProcessor();
-      addReduceWorkRules(opRules, vnp);
-      Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
-      GraphWalker ogw = new DefaultGraphWalker(disp);
-      ArrayList<Node> topNodes = new ArrayList<Node>();
-      topNodes.add(reduceWork.getReducer());
-      ogw.startWalking(topNodes, null);
-    }
-  }
-
-  class MapWorkValidationNodeProcessor implements NodeProcessor {
-
-    private final MapWork mapWork;
-    private final boolean isTezOrSpark;
-
-    // Children of Vectorized GROUPBY that outputs rows instead of vectorized row batchs.
-    protected final Set<Operator<? extends OperatorDesc>> nonVectorizedOps =
-        new HashSet<Operator<? extends OperatorDesc>>();
-
-    public Set<Operator<? extends OperatorDesc>> getNonVectorizedOps() {
-      return nonVectorizedOps;
-    }
-
-    public MapWorkValidationNodeProcessor(MapWork mapWork, boolean isTezOrSpark) {
-      this.mapWork = mapWork;
-      this.isTezOrSpark = isTezOrSpark;
-    }
-
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-      for (Node n : stack) {
-        Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
-        if (nonVectorizedOps.contains(op)) {
-          return new Boolean(true);
-        }
-        boolean ret;
-        currentOperator = op;
-        try {
-          ret = validateMapWorkOperator(op, mapWork, isTezOrSpark);
-        } catch (Exception e) {
-          String oneLineStackTrace = VectorizationContext.getStackTraceAsSingleLine(e);
-          LOG.info(oneLineStackTrace);
-          throw new SemanticException(e);
-        }
-        if (!ret) {
-          return new Boolean(false);
-        }
-        // When Vectorized GROUPBY outputs rows instead of vectorized row batches, we don't
-        // vectorize the operators below it.
-        if (isVectorizedGroupByThatOutputsRows(op)) {
-          addOperatorChildrenToSet(op, nonVectorizedOps);
-          return new Boolean(true);
-        }
-      }
-      return new Boolean(true);
-    }
-  }
-
-  class ReduceWorkValidationNodeProcessor implements NodeProcessor {
-
-    private final VectorTaskColumnInfo vectorTaskColumnInfo;
-    private final TypeInfo[] reducerBatchTypeInfos;
-
-    public ReduceWorkValidationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo) {
-      this.vectorTaskColumnInfo = vectorTaskColumnInfo;
-      reducerBatchTypeInfos =  vectorTaskColumnInfo.allTypeInfos.toArray(new TypeInfo[0]);
-    }
-
-    // Children of Vectorized GROUPBY that outputs rows instead of vectorized row batchs.
-    protected final Set<Operator<? extends OperatorDesc>> nonVectorizedOps =
-        new HashSet<Operator<? extends OperatorDesc>>();
-
-    public Set<Operator<? extends OperatorDesc>> getNonVectorizeOps() {
-      return nonVectorizedOps;
-    }
-
-    public Set<Operator<? extends OperatorDesc>> getNonVectorizedOps() {
-      return nonVectorizedOps;
-    }
-
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-      for (Node n : stack) {
-        Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
-        if (nonVectorizedOps.contains(op)) {
-          return new Boolean(true);
-        }
-        currentOperator = op;
-        boolean ret = validateReduceWorkOperator(op, reducerBatchTypeInfos);
-        if (!ret) {
-          return new Boolean(false);
-        }
-        // When Vectorized GROUPBY outputs rows instead of vectorized row batches, we don't
-        // vectorize the operators below it.
-        if (isVectorizedGroupByThatOutputsRows(op)) {
-          addOperatorChildrenToSet(op, nonVectorizedOps);
-          return new Boolean(true);
-        }
-      }
-      return new Boolean(true);
-    }
-  }
-
-  // This class has common code used by both MapWorkVectorizationNodeProcessor and
-  // ReduceWorkVectorizationNodeProcessor.
-  class VectorizationNodeProcessor implements NodeProcessor {
-
-    // The vectorization context for the Map or Reduce task.
-    protected VectorizationContext taskVectorizationContext;
-
-    protected final VectorTaskColumnInfo vectorTaskColumnInfo;
-    protected final Set<Operator<? extends OperatorDesc>> nonVectorizedOps;
-
-    VectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo,
-        Set<Operator<? extends OperatorDesc>> nonVectorizedOps) {
-      this.vectorTaskColumnInfo = vectorTaskColumnInfo;
-      this.nonVectorizedOps = nonVectorizedOps;
-    }
-
-    public String[] getVectorScratchColumnTypeNames() {
-      return taskVectorizationContext.getScratchColumnTypeNames();
-    }
-
-    protected final Set<Operator<? extends OperatorDesc>> opsDone =
-        new HashSet<Operator<? extends OperatorDesc>>();
-
-    protected final Map<Operator<? extends OperatorDesc>, Operator<? extends OperatorDesc>> opToVectorOpMap =
-        new HashMap<Operator<? extends OperatorDesc>, Operator<? extends OperatorDesc>>();
-
-    public VectorizationContext walkStackToFindVectorizationContext(Stack<Node> stack,
-            Operator<? extends OperatorDesc> op) throws SemanticException {
-      VectorizationContext vContext = null;
-      if (stack.size() <= 1) {
-        throw new SemanticException(
-            String.format("Expected operator stack for operator %s to have at least 2 operators",
-                  op.getName()));
-      }
-      // Walk down the stack of operators until we found one willing to give us a context.
-      // At the bottom will be the root operator, guaranteed to have a context
-      int i= stack.size()-2;
-      while (vContext == null) {
-        if (i < 0) {
-          return null;
-        }
-        Operator<? extends OperatorDesc> opParent = (Operator<? extends OperatorDesc>) stack.get(i);
-        Operator<? extends OperatorDesc> vectorOpParent = opToVectorOpMap.get(opParent);
-        if (vectorOpParent != null) {
-          if (vectorOpParent instanceof VectorizationContextRegion) {
-            VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOpParent;
-            vContext = vcRegion.getOuputVectorizationContext();
-            LOG.info("walkStackToFindVectorizationContext " + vectorOpParent.getName() + " has new vectorization context " + vContext.toString());
-          } else {
-            LOG.info("walkStackToFindVectorizationContext " + vectorOpParent.getName() + " does not have new vectorization context");
-          }
-        } else {
-          LOG.info("walkStackToFindVectorizationContext " + opParent.getName() + " is not vectorized");
-        }
-        --i;
-      }
-      return vContext;
-    }
-
-    public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op,
-            VectorizationContext vContext, boolean isTezOrSpark) throws SemanticException {
-      Operator<? extends OperatorDesc> vectorOp = op;
-      try {
-        if (!opsDone.contains(op)) {
-          vectorOp = vectorizeOperator(op, vContext, isTezOrSpark, vectorTaskColumnInfo);
-          opsDone.add(op);
-          if (vectorOp != op) {
-            opToVectorOpMap.put(op, vectorOp);
-            opsDone.add(vectorOp);
-          }
-        }
-      } catch (HiveException e) {
-        throw new SemanticException(e);
-      }
-      return vectorOp;
-    }
-
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-      throw new SemanticException("Must be overridden");
-    }
-  }
-
-  class MapWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
-
-    private final VectorTaskColumnInfo vectorTaskColumnInfo;
-    private final boolean isTezOrSpark;
-
-    public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTezOrSpark,
-        VectorTaskColumnInfo vectorTaskColumnInfo) {
-      super(vectorTaskColumnInfo, vectorTaskColumnInfo.getNonVectorizedOps());
-      this.vectorTaskColumnInfo = vectorTaskColumnInfo;
-      this.isTezOrSpark = isTezOrSpark;
-    }
-
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-
-      Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
-      if (nonVectorizedOps.contains(op)) {
-        return null;
-      }
-
-      VectorizationContext vContext = null;
-
-      currentOperator = op;
-      if (op instanceof TableScanOperator) {
-        if (taskVectorizationContext == null) {
-          taskVectorizationContext = getVectorizationContext(op.getName(), vectorTaskColumnInfo);
-          if (LOG.isInfoEnabled()) {
-            LOG.info("MapWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " mapColumnNames " + vectorTaskColumnInfo.allColumnNames.toString());
-            LOG.info("MapWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " mapTypeInfos " + vectorTaskColumnInfo.allTypeInfos.toString());
-          }
-        }
-        vContext = taskVectorizationContext;
-      } else {
-        LOG.debug("MapWorkVectorizationNodeProcessor process going to walk the operator stack to get vectorization context for " + op.getName());
-        vContext = walkStackToFindVectorizationContext(stack, op);
-        if (vContext == null) {
-          // No operator has "pushed" a new context -- so use the task vectorization context.
-          vContext = taskVectorizationContext;
-        }
-      }
-
-      assert vContext != null;
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("MapWorkVectorizationNodeProcessor process operator " + op.getName()
-            + " using vectorization context" + vContext.toString());
-      }
-
-      Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTezOrSpark);
-
-      if (LOG.isDebugEnabled()) {
-        if (vectorOp instanceof VectorizationContextRegion) {
-          VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
-          VectorizationContext vNewContext = vcRegion.getOuputVectorizationContext();
-          LOG.debug("Vectorized MapWork operator " + vectorOp.getName() + " added vectorization context " + vNewContext.toString());
-        }
-      }
-
-      return null;
-    }
-  }
-
-  class ReduceWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
-
-    private final VectorTaskColumnInfo vectorTaskColumnInfo;
-
-
-    private Operator<? extends OperatorDesc> rootVectorOp;
-
-    public Operator<? extends OperatorDesc> getRootVectorOp() {
-      return rootVectorOp;
-    }
-
-    public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo) {
-
-      super(vectorTaskColumnInfo, vectorTaskColumnInfo.getNonVectorizedOps());
-      this.vectorTaskColumnInfo =  vectorTaskColumnInfo;
-      rootVectorOp = null;
-    }
-
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-
-      Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
-      if (nonVectorizedOps.contains(op)) {
-        return null;
-      }
-
-      VectorizationContext vContext = null;
-
-      boolean saveRootVectorOp = false;
-
-      currentOperator = op;
-      if (op.getParentOperators().size() == 0) {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("ReduceWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " reduceColumnNames " + vectorTaskColumnInfo.allColumnNames.toString());
-          LOG.info("ReduceWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum + " reduceTypeInfos " + vectorTaskColumnInfo.allTypeInfos.toString());
-        }
-        vContext = new VectorizationContext("__Reduce_Shuffle__", vectorTaskColumnInfo.allColumnNames, hiveConf);
-        taskVectorizationContext = vContext;
-
-        saveRootVectorOp = true;
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Vectorized ReduceWork reduce shuffle vectorization context " + vContext.toString());
-        }
-      } else {
-        LOG.info("ReduceWorkVectorizationNodeProcessor process going to walk the operator stack to get vectorization context for " + op.getName());
-        vContext = walkStackToFindVectorizationContext(stack, op);
-        if (vContext == null) {
-          // If we didn't find a context among the operators, assume the top -- reduce shuffle's
-          // vectorization context.
-          vContext = taskVectorizationContext;
-        }
-      }
-
-      assert vContext != null;
-      LOG.info("ReduceWorkVectorizationNodeProcessor process operator " + op.getName() + " using vectorization context" + vContext.toString());
-
-      Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, true);
-
-      if (LOG.isDebugEnabled()) {
-        if (vectorOp instanceof VectorizationContextRegion) {
-          VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
-          VectorizationContext vNewContext = vcRegion.getOuputVectorizationContext();
-          LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() + " added vectorization context " + vNewContext.toString());
-        }
-      }
-      if (saveRootVectorOp && op != vectorOp) {
-        rootVectorOp = vectorOp;
-      }
-
-      return null;
-    }
-  }
-
-  private static class ValidatorVectorizationContext extends VectorizationContext {
-    private ValidatorVectorizationContext(HiveConf hiveConf) {
-      super("No Name", hiveConf);
-    }
-
-    @Override
-    public int getInputColumnIndex(String name) {
-      return 0;
-    }
-
-    @Override
-    protected int getInputColumnIndex(ExprNodeColumnDesc colExpr) {
-      return 0;
-    }
-  }
+  }
 
   @Override
   public PhysicalContext resolve(PhysicalContext physicalContext) throws SemanticException {
 
     hiveConf = physicalContext.getConf();
 
-    boolean vectorPath = HiveConf.getBoolVar(hiveConf,
+    String vectorizationEnabledOverrideString =
+        HiveConf.getVar(hiveConf,
+            HiveConf.ConfVars.HIVE_TEST_VECTORIZATION_ENABLED_OVERRIDE);
+    vectorizationEnabledOverride =
+        VectorizationEnabledOverride.nameMap.get(vectorizationEnabledOverrideString);
+
+    isVectorizationEnabled = HiveConf.getBoolVar(hiveConf,
         HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
-    if (!vectorPath) {
+
+    final boolean weCanAttemptVectorization;
+    switch (vectorizationEnabledOverride) {
+    case NONE:
+      weCanAttemptVectorization = isVectorizationEnabled;
+      break;
+    case DISABLE:
+      weCanAttemptVectorization = false;
+      break;
+    case ENABLE:
+      weCanAttemptVectorization = true;
+      break;
+    default:
+      throw new RuntimeException("Unexpected vectorization enabled override " +
+          vectorizationEnabledOverride);
+    }
+    if (!weCanAttemptVectorization) {
       LOG.info("Vectorization is disabled");
       return physicalContext;
     }
@@ -1884,6 +2041,25 @@ public class Vectorizer implements PhysicalPlanResolver {
         HiveConf.getIntVar(hiveConf,
             HiveConf.ConfVars.HIVE_VECTORIZATION_TESTING_REDUCER_BATCH_SIZE);
 
+    vectorizedInputFormatSupportEnabled =
+        HiveConf.getVar(hiveConf,
+            HiveConf.ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED);
+    String[] supportEnabledStrings = vectorizedInputFormatSupportEnabled.toLowerCase().split(",");
+    vectorizedInputFormatSupportEnabledSet = new TreeSet<Support>();
+    for (String supportEnabledString : supportEnabledStrings) {
+      Support support = Support.nameToSupportMap.get(supportEnabledString);
+
+      // Known?
+      if (support != null) {
+        vectorizedInputFormatSupportEnabledSet.add(support);
+      }
+    }
+
+    isLlapIoEnabled =
+        HiveConf.getBoolVar(hiveConf,
+            HiveConf.ConfVars.LLAP_IO_ENABLED,
+            LlapProxy.isDaemon());
+
     isSchemaEvolution =
         HiveConf.getBoolVar(hiveConf,
             HiveConf.ConfVars.HIVE_SCHEMA_EVOLUTION);
@@ -1924,129 +2100,6 @@ public class Vectorizer implements PhysicalPlanResolver {
     }
   }
 
-  boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op, MapWork mWork, boolean isTezOrSpark) {
-    boolean ret;
-    switch (op.getType()) {
-      case MAPJOIN:
-        if (op instanceof MapJoinOperator) {
-          ret = validateMapJoinOperator((MapJoinOperator) op);
-        } else if (op instanceof SMBMapJoinOperator) {
-          ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op);
-        } else {
-          setOperatorNotSupported(op);
-          ret = false;
-        }
-        break;
-      case GROUPBY:
-        ret = validateGroupByOperator((GroupByOperator) op, false, isTezOrSpark);
-        break;
-      case FILTER:
-        ret = validateFilterOperator((FilterOperator) op);
-        break;
-      case SELECT:
-        ret = validateSelectOperator((SelectOperator) op);
-        break;
-      case REDUCESINK:
-        ret = validateReduceSinkOperator((ReduceSinkOperator) op);
-        break;
-      case TABLESCAN:
-        ret = validateTableScanOperator((TableScanOperator) op, mWork);
-        break;
-      case FILESINK:
-      case LIMIT:
-      case EVENT:
-      case SPARKPRUNINGSINK:
-        ret = true;
-        break;
-      case HASHTABLESINK:
-        ret = op instanceof SparkHashTableSinkOperator &&
-            validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op);
-        break;
-      default:
-        setOperatorNotSupported(op);
-        ret = false;
-        break;
-    }
-    return ret;
-  }
-
-  boolean validateReduceWorkOperator(Operator<? extends OperatorDesc> op,
-      TypeInfo[] reducerBatchTypeInfos) {
-    boolean ret;
-    switch (op.getType()) {
-      case MAPJOIN:
-        // Does MAPJOIN actually get planned in Reduce?
-        if (op instanceof MapJoinOperator) {
-          ret = validateMapJoinOperator((MapJoinOperator) op);
-        } else if (op instanceof SMBMapJoinOperator) {
-          ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op);
-        } else {
-          setOperatorNotSupported(op);
-          ret = false;
-        }
-        break;
-      case GROUPBY:
-        if (HiveConf.getBoolVar(hiveConf,
-                    HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_GROUPBY_ENABLED)) {
-          ret = validateGroupByOperator((GroupByOperator) op, true, true);
-        } else {
-          setNodeIssue("Operator " + op.getType() + " not enabled (" + HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_GROUPBY_ENABLED.name() + "=true IS false)");
-          ret = false;
-        }
-        break;
-      case FILTER:
-        ret = validateFilterOperator((FilterOperator) op);
-        break;
-      case SELECT:
-        ret = validateSelectOperator((SelectOperator) op);
-        break;
-      case REDUCESINK:
-        ret = validateReduceSinkOperator((ReduceSinkOperator) op);
-        break;
-      case FILESINK:
-        ret = validateFileSinkOperator((FileSinkOperator) op);
-        break;
-      case LIMIT:
-      case EVENT:
-      case SPARKPRUNINGSINK:
-        ret = true;
-        break;
-      case HASHTABLESINK:
-        ret = op instanceof SparkHashTableSinkOperator &&
-            validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op);
-        break;
-      case PTF:
-        // PTF needs the TypeInfo of the reducer batch.
-        ret = validatePTFOperator((PTFOperator) op, reducerBatchTypeInfos);
-        break;
-      default:
-        setOperatorNotSupported(op);
-        ret = false;
-        break;
-    }
-    return ret;
-  }
-
-  private void addOperatorChildrenToSet(Operator<? extends OperatorDesc> op,
-      Set<Operator<? extends OperatorDesc>> nonVectorizedOps) {
-    for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
-      if (!nonVectorizedOps.contains(childOp)) {
-        nonVectorizedOps.add(childOp);
-        addOperatorChildrenToSet(childOp, nonVectorizedOps);
-      }
-    }
-  }
-
-  // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't
-  // vectorize the operators below it.
-   private Boolean isVectorizedGroupByThatOutputsRows(Operator<? extends OperatorDesc> op)
-      throws SemanticException {
-    if (op.getType().equals(OperatorType.GROUPBY)) {
-      GroupByDesc desc = (GroupByDesc) op.getConf();
-      return !((VectorGroupByDesc) desc.getVectorDesc()).isVectorOutput();
-    }
-    return false;
-  }
 
   private boolean validateSMBMapJoinOperator(SMBMapJoinOperator op) {
     SMBJoinDesc desc = op.getConf();
@@ -2134,7 +2187,10 @@ public class Vectorizer implements PhysicalPlanResolver {
         desc, "Predicate", VectorExpressionDescriptor.Mode.FILTER, /* allowComplex */ true);
   }
 
-  private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTezOrSpark) {
+
+  private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce,
+      boolean isTezOrSpark, VectorGroupByDesc vectorGroupByDesc) {
+
     GroupByDesc desc = op.getConf();
 
     if (desc.getMode() != GroupByDesc.Mode.HASH && desc.isDistinct()) {
@@ -2252,26 +2308,16 @@ public class Vectorizer implements PhysicalPlanResolver {
       return false;
     }
 
-    Pair<Boolean,Boolean> retPair =
-        validateAggregationDescs(desc.getAggregators(), desc.getMode(), hasKeys);
-    if (!retPair.left) {
+    if (!validateAggregationDescs(desc.getAggregators(), desc.getMode(), hasKeys)) {
       return false;
     }
 
-    // If all the aggregation outputs are primitive, we can output VectorizedRowBatch.
-    // Otherwise, we the rest of the operator tree will be row mode.
-    VectorGroupByDesc vectorDesc = new VectorGroupByDesc();
-    desc.setVectorDesc(vectorDesc);
+    vectorGroupByDesc.setProcessingMode(processingMode);
 
-    vectorDesc.setVectorOutput(retPair.right);
+    vectorGroupByDesc.setIsVectorizationComplexTypesEnabled(isVectorizationComplexTypesEnabled);
+    vectorGroupByDesc.setIsVectorizationGroupByComplexTypesEnabled(isVectorizationGroupByComplexTypesEnabled);
 
-    vectorDesc.setProcessingMode(processingMode);
-
-    vectorDesc.setIsVectorizationComplexTypesEnabled(isVectorizationComplexTypesEnabled);
-    vectorDesc.setIsVectorizationGroupByComplexTypesEnabled(isVectorizationGroupByComplexTypesEnabled);
-
-    LOG.info("Vector GROUP BY operator will use processing mode " + processingMode.name() +
-        ", isVectorOutput " + vectorDesc.isVectorOutput());
+    LOG.info("Vector GROUP BY operator will use processing mode " + processingMode.name());
 
     return true;
   }
@@ -2308,7 +2354,9 @@ public class Vectorizer implements PhysicalPlanResolver {
     return false;
   }
 
-  private boolean validatePTFOperator(PTFOperator op, TypeInfo[] reducerBatchTypeInfos) {
+  private boolean validatePTFOperator(PTFOperator op, VectorizationContext vContext,
+      VectorPTFDesc vectorPTFDesc)
+      throws HiveException {
 
     if (!isPtfVectorizationEnabled) {
       setNodeIssue("Vectorization of PTF is not enabled (" +
@@ -2342,15 +2390,13 @@ public class Vectorizer implements PhysicalPlanResolver {
     // We use this information for validation.  Later when creating the vector operator
     // we create an additional object VectorPTFInfo.
 
-    VectorPTFDesc vectorPTFDesc = null;
     try {
-      vectorPTFDesc = createVectorPTFDesc(
-          op, ptfDesc, reducerBatchTypeInfos, vectorizedPTFMaxMemoryBufferingBatchCount);
+      createVectorPTFDesc(
+          op, ptfDesc, vContext, vectorPTFDesc, vectorizedPTFMaxMemoryBufferingBatchCount);
     } catch (HiveException e) {
       setOperatorIssue("exception: " + VectorizationContext.getStackTraceAsSingleLine(e));
       return false;
     }
-    ptfDesc.setVectorDesc(vectorPTFDesc);
 
     // Output columns ok?
     String[] outputColumnNames = vectorPTFDesc.getOutputColumnNames();
@@ -2469,19 +2515,15 @@ public class Vectorizer implements PhysicalPlanResolver {
     return true;
   }
 
-  private Pair<Boolean,Boolean> validateAggregationDescs(List<AggregationDesc> descs,
+  private boolean validateAggregationDescs(List<AggregationDesc> descs,
       GroupByDesc.Mode groupByMode, boolean hasKeys) {
-    boolean outputIsPrimitive = true;
+
     for (AggregationDesc d : descs) {
-      Pair<Boolean,Boolean>  retPair = validateAggregationDesc(d, groupByMode, hasKeys);
-      if (!retPair.left) {
-        return retPair;
-      }
-      if (!retPair.right) {
-        outputIsPrimitive = false;
+      if (!validateAggregationDesc(d, groupByMode, hasKeys)) {
+        return false;
       }
     }
-    return new Pair<Boolean, Boolean>(true, outputIsPrimitive);
+    return true;
   }
 
   private boolean validateExprNodeDescRecursive(ExprNodeDesc desc, String expressionTitle,
@@ -2597,26 +2639,7 @@ public class Vectorizer implements PhysicalPlanResolver {
 
   boolean validateExprNodeDesc(ExprNodeDesc desc, String expressionTitle,
       VectorExpressionDescriptor.Mode mode, boolean allowComplex) {
-    if (!validateExprNodeDescRecursive(desc, expressionTitle, mode, allowComplex)) {
-      return false;
-    }
-    try {
-      VectorizationContext vc = new ValidatorVectorizationContext(hiveConf);
-      if (vc.getVectorExpression(desc, mode) == null) {
-        // TODO: this cannot happen - VectorizationContext throws in such cases.
-        setExpressionIssue(expressionTitle, "getVectorExpression returned null");
-        return false;
-      }
-    } catch (Exception e) {
-      if (e instanceof HiveException) {
-        setExpressionIssue(expressionTitle, e.getMessage());
-      } else {
-        String issue = "exception: " + VectorizationContext.getStackTraceAsSingleLine(e);
-        setExpressionIssue(expressionTitle, issue);
-      }
-      return false;
-    }
-    return true;
+    return validateExprNodeDescRecursive(desc, expressionTitle, mode, allowComplex);
   }
 
   private boolean validateGenericUdf(ExprNodeGenericFuncDesc genericUDFExpr) {
@@ -2636,86 +2659,29 @@ public class Vectorizer implements PhysicalPlanResolver {
     return true;
   }
 
-  public static Category aggregationOutputCategory(VectorAggregateExpression vectorAggrExpr) {
-    ObjectInspector outputObjInspector = vectorAggrExpr.getOutputObjectInspector();
-    return outputObjInspector.getCategory();
-  }
-
-  private Pair<Boolean,Boolean> validateAggregationDesc(AggregationDesc aggDesc, GroupByDesc.Mode groupByMode,
+  private boolean validateAggregationDesc(AggregationDesc aggDesc, GroupByDesc.Mode groupByMode,
       boolean hasKeys) {
 
     String udfName = aggDesc.getGenericUDAFName().toLowerCase();
     if (!supportedAggregationUdfs.contains(udfName)) {
       setExpressionIssue("Aggregation Function", "UDF " + udfName + " not supported");
-      return new Pair<Boolean,Boolean>(false, false);
+      return false;
     }
     /*
     // The planner seems to pull this one out.
-    if (aggDesc.getDistinct()) {
-      setExpressionIssue("Aggregation Function", "DISTINCT not supported");
-      return new Pair<Boolean,Boolean>(false, false);
-    }
-    */
-
-    ArrayList<ExprNodeDesc> parameters = aggDesc.getParameters();
-
-    if (parameters != null && !validateExprNodeDesc(parameters, "Aggregation Function UDF " + udfName + " parameter")) {
-      return new Pair<Boolean,Boolean>(false, false);
-    }
-
-    // See if we can vectorize the aggregation.
-    VectorizationContext vc = new ValidatorVectorizationContext(hiveConf);
-    VectorAggregateExpression vectorAggrExpr;
-    try {
-        vectorAggrExpr = vc.getAggregatorExpression(aggDesc);
-    } catch (Exception e) {
-      // We should have already attempted to vectorize in validateAggregationDesc.
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Vectorization of aggregation should have succeeded ", e);
-      }
-      setExpressionIssue("Aggregation Function", "Vectorization of aggreation should have succeeded " + e);
-      return new Pair<Boolean,Boolean>(false, false);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Aggregation " + aggDesc.getExprString() + " --> " +
-          " vector expression " + vectorAggrExpr.toString());
-    }
-
-    boolean canVectorizeComplexType =
-        (isVectorizationComplexTypesEnabled && isVectorizationGroupByComplexTypesEnabled);
-
-    boolean isVectorOutput;
-    if (canVectorizeComplexType) {
-      isVectorOutput = true;
-    } else {
-
-      // Do complex input type checking...
-      boolean inputIsPrimitive;
-      if (parameters == null || parameters.size() == 0) {
-        inputIsPrimitive = true;   // Pretend for COUNT(*)
-      } else {
-
-        // Multi-input should have been eliminated earlier.
-        // Preconditions.checkState(parameters.size() == 1);
-
-        final Category inputCategory = parameters.get(0).getTypeInfo().getCategory();
-        inputIsPrimitive = (inputCategory == Category.PRIMITIVE);
-      }
+    if (aggDesc.getDistinct()) {
+      setExpressionIssue("Aggregation Function", "DISTINCT not supported");
+      return new Pair<Boolean,Boolean>(false, false);
+    }
+    */
 
-      if (!inputIsPrimitive) {
-        setOperatorIssue("Cannot vectorize GROUP BY with aggregation complex type inputs in " +
-            aggDesc.getExprString() + " since " +
-            GroupByDesc.getComplexTypeWithGroupByEnabledCondition(
-                isVectorizationComplexTypesEnabled, isVectorizationGroupByComplexTypesEnabled));
-        return new Pair<Boolean,Boolean>(false, false);
-      }
+    ArrayList<ExprNodeDesc> parameters = aggDesc.getParameters();
 
-      // Now, look a the output.  If the output is complex, we switch to row-mode for all child
-      // operators...
-      isVectorOutput = (aggregationOutputCategory(vectorAggrExpr) == Category.PRIMITIVE);
+    if (parameters != null && !validateExprNodeDesc(parameters, "Aggregation Function UDF " + udfName + " parameter")) {
+      return false;
     }
 
-    return new Pair<Boolean,Boolean>(true, isVectorOutput);
+    return true;
   }
 
   public static boolean validateDataType(String type, VectorExpressionDescriptor.Mode mode,
@@ -2769,7 +2735,12 @@ public class Vectorizer implements PhysicalPlanResolver {
       VectorTaskColumnInfo vectorTaskColumnInfo) {
 
     VectorizationContext vContext =
-        new VectorizationContext(contextName, vectorTaskColumnInfo.allColumnNames, hiveConf);
+        new VectorizationContext(
+            contextName,
+            vectorTaskColumnInfo.allColumnNames,
+            vectorTaskColumnInfo.allTypeInfos,
+            vectorTaskColumnInfo.allDataTypePhysicalVariations,
+            hiveConf);
 
     return vContext;
   }
@@ -2831,12 +2802,12 @@ public class Vectorizer implements PhysicalPlanResolver {
   }
 
   Operator<? extends OperatorDesc> specializeMapJoinOperator(Operator<? extends OperatorDesc> op,
-        VectorizationContext vContext, MapJoinDesc desc, VectorMapJoinInfo vectorMapJoinInfo)
+        VectorizationContext vContext, MapJoinDesc desc, VectorMapJoinDesc vectorDesc)
             throws HiveException {
     Operator<? extends OperatorDesc> vectorOp = null;
     Class<? extends Operator<?>> opClass = null;
 
-    VectorMapJoinDesc vectorDesc = (VectorMapJoinDesc) desc.getVectorDesc();
+    VectorMapJoinInfo vectorMapJoinInfo = vectorDesc.getVectorMapJoinInfo();
 


<TRUNCATED>

Mime
View raw message