hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmccl...@apache.org
Subject [46/51] [partial] hive git commit: HIVE-17433: Vectorization: Support Decimal64 in Hive Query Engine (Matt McCline, reviewed by Teddy Choi)
Date Sun, 29 Oct 2017 20:40:25 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationDesc.java
new file mode 100644
index 0000000..910ac80
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationDesc.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFVariance;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hive.common.util.AnnotationUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * VectorAggregationDesc.
+ *
+ * Mode is GenericUDAFEvaluator.Mode.
+ *
+ * It is the different modes for an aggregate UDAF (User Defined Aggregation Function).
+ *
+ *    (Notice the these names are a subset of GroupByDesc.Mode...)
+ *
+ *        PARTIAL1       Original data            --> Partial aggregation data
+ *
+ *        PARTIAL2       Partial aggregation data --> Partial aggregation data
+ *
+ *        FINAL          Partial aggregation data --> Full aggregation data
+ *
+ *        COMPLETE       Original data            --> Full aggregation data
+ *
+ *
+ * SIMPLEST CASE --> The data type/semantics of original data, partial aggregation
+ *     data, and full aggregation data ARE THE SAME.  E.g. MIN, MAX, SUM.  The different
+ *     modes can be handled by one aggregation class.
+ *
+ *     This case has a null for the Mode.
+ *
+ * FOR OTHERS --> The data type/semantics of partial aggregation data and full aggregation data
+ *    ARE THE SAME but different than original data.  This results in 2 aggregation classes:
+ *
+ *       1) A class that takes original rows and outputs partial/full aggregation
+ *          (PARTIAL1/COMPLETE)
+ *
+ *         and
+ *
+ *       2) A class that takes partial aggregation and produces full aggregation
+ *          (PARTIAL2/FINAL).
+ *
+ *    E.g. COUNT(*) and COUNT(column)
+ *
+ * OTHERWISE FULL --> The data type/semantics of partial aggregation data is different than
+ *    original data and full aggregation data.
+ *
+ *    E.g. AVG uses a STRUCT with count and sum for partial aggregation data.  It divides
+ *    sum by count to produce the average for final aggregation.
+ *
+ */
+public class VectorAggregationDesc implements java.io.Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private final AggregationDesc aggrDesc;
+
+  private final TypeInfo inputTypeInfo;
+  private final ColumnVector.Type inputColVectorType;
+  private final VectorExpression inputExpression;
+
+  private final TypeInfo outputTypeInfo;
+  private final ColumnVector.Type outputColVectorType;
+  private final DataTypePhysicalVariation outputDataTypePhysicalVariation;
+
+  private final Class<? extends VectorAggregateExpression> vecAggrClass;
+
+  private GenericUDAFEvaluator evaluator;
+
+  public VectorAggregationDesc(AggregationDesc aggrDesc, GenericUDAFEvaluator evaluator,
+      TypeInfo inputTypeInfo, ColumnVector.Type inputColVectorType,
+      VectorExpression inputExpression, TypeInfo outputTypeInfo,
+      ColumnVector.Type outputColVectorType,
+      Class<? extends VectorAggregateExpression> vecAggrClass) {
+
+    this.aggrDesc = aggrDesc;
+    this.evaluator = evaluator;
+
+    this.inputTypeInfo = inputTypeInfo;
+    this.inputColVectorType = inputColVectorType;
+    this.inputExpression = inputExpression;
+
+    this.outputTypeInfo = outputTypeInfo;
+    this.outputColVectorType = outputColVectorType;
+    outputDataTypePhysicalVariation =
+        (outputColVectorType == ColumnVector.Type.DECIMAL_64 ?
+            DataTypePhysicalVariation.DECIMAL_64 : DataTypePhysicalVariation.NONE);
+
+    this.vecAggrClass = vecAggrClass;
+  }
+
+  public AggregationDesc getAggrDesc() {
+    return aggrDesc;
+  }
+
+  public TypeInfo getInputTypeInfo() {
+    return inputTypeInfo;
+  }
+
+  public ColumnVector.Type getInputColVectorType() {
+    return inputColVectorType;
+  }
+
+  public VectorExpression getInputExpression() {
+    return inputExpression;
+  }
+
+  public TypeInfo getOutputTypeInfo() {
+    return outputTypeInfo;
+  }
+
+  public ColumnVector.Type getOutputColVectorType() {
+    return outputColVectorType;
+  }
+
+  public DataTypePhysicalVariation getOutputDataTypePhysicalVariation() {
+    return outputDataTypePhysicalVariation;
+  }
+
+  public GenericUDAFEvaluator getEvaluator() {
+    return evaluator;
+  }
+
+  public Class<? extends VectorAggregateExpression> getVecAggrClass() {
+    return vecAggrClass;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(vecAggrClass.getSimpleName());
+    if (inputExpression != null) {
+      sb.append("(");
+      sb.append(inputExpression.toString());
+      sb.append(") -> ");
+    } else {
+      sb.append("(*) -> ");
+    }
+    sb.append(outputTypeInfo.toString());
+    if (outputDataTypePhysicalVariation != null && outputDataTypePhysicalVariation != DataTypePhysicalVariation.NONE) {
+      sb.append("/");
+      sb.append(outputDataTypePhysicalVariation);
+    }
+    String aggregationName = aggrDesc.getGenericUDAFName();
+    if (GenericUDAFVariance.isVarianceFamilyName(aggregationName)) {
+      sb.append(" aggregation: ");
+      sb.append(aggregationName);
+    }
+    return sb.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java
index 2c433f7..e367243 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorAppMasterEventDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.Writable;
@@ -33,11 +35,13 @@ import com.google.common.annotations.VisibleForTesting;
 /**
  * App Master Event operator implementation.
  **/
-public class VectorAppMasterEventOperator extends AppMasterEventOperator {
+public class VectorAppMasterEventOperator extends AppMasterEventOperator
+    implements VectorizationOperator {
 
   private static final long serialVersionUID = 1L;
 
   private VectorizationContext vContext;
+  private VectorAppMasterEventDesc vectorDesc;
 
   // The above members are initialized by the constructor and must not be
   // transient.
@@ -50,10 +54,12 @@ public class VectorAppMasterEventOperator extends AppMasterEventOperator {
   protected transient Object[] singleRow;
 
   public VectorAppMasterEventOperator(
-      CompilationOpContext ctx, VectorizationContext vContext, OperatorDesc conf) {
+      CompilationOpContext ctx, OperatorDesc conf, VectorizationContext vContext,
+      VectorDesc vectorDesc) {
     super(ctx);
     this.conf = (AppMasterEventDesc) conf;
     this.vContext = vContext;
+    this.vectorDesc = (VectorAppMasterEventDesc) vectorDesc;
   }
 
   /** Kryo ctor. */
@@ -133,4 +139,14 @@ public class VectorAppMasterEventOperator extends AppMasterEventOperator {
     forward(data, rowInspector, true);
   }
 
+  @Override
+  public VectorizationContext getInputVectorizationContext() {
+    return vContext;
+  }
+
+  @Override
+  public VectorDesc getVectorDesc() {
+    return vectorDesc;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java
index f02a300..0a15bcb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java
@@ -819,8 +819,16 @@ public class VectorAssignRow {
               VectorizedBatchUtil.setNullColIsNullValue(columnVector, batchIndex);
               return;
             }
-            ((DecimalColumnVector) columnVector).set(
-                batchIndex, hiveDecimal);
+            if (columnVector instanceof Decimal64ColumnVector) {
+              Decimal64ColumnVector dec64ColVector = (Decimal64ColumnVector) columnVector;
+              dec64ColVector.set(batchIndex, hiveDecimal);
+              if (dec64ColVector.isNull[batchIndex]) {
+                return;
+              }
+            } else {
+              ((DecimalColumnVector) columnVector).set(
+                  batchIndex, hiveDecimal);
+            }
           }
           break;
         case INTERVAL_YEAR_MONTH:

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java
index 7ac4f07..b7d3b6d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java
@@ -105,6 +105,7 @@ public class VectorColumnSetInfo {
 
     switch (columnVectorType) {
     case LONG:
+    case DECIMAL_64:
       longIndices[addLongIndex] = addKeyIndex;
       columnTypeSpecificIndices[addKeyIndex] = addLongIndex++;
       break;

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
index 3826182..2cc80e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.VectorPartitionConversion;
@@ -86,6 +87,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
   private T deserializeRead;
 
   private TypeInfo[] sourceTypeInfos;
+  protected DataTypePhysicalVariation[] dataTypePhysicalVariations;
 
   private byte[] inputBytes;
 
@@ -97,6 +99,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
     this();
     this.deserializeRead = deserializeRead;
     sourceTypeInfos = deserializeRead.typeInfos();
+    dataTypePhysicalVariations = deserializeRead.getDataTypePhysicalVariations();
   }
 
   // Not public since we must have the deserialize read object.
@@ -110,6 +113,8 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
     private PrimitiveCategory primitiveCategory;
                   //The data type primitive category of the column being deserialized.
 
+    private DataTypePhysicalVariation dataTypePhysicalVariation;
+
     private int maxLength;
                   // For the CHAR and VARCHAR data types, the maximum character length of
                   // the column.  Otherwise, 0.
@@ -130,9 +135,11 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
 
     private ObjectInspector objectInspector;
 
-    public Field(PrimitiveCategory primitiveCategory, int maxLength) {
+    public Field(PrimitiveCategory primitiveCategory, DataTypePhysicalVariation dataTypePhysicalVariation,
+        int maxLength) {
       this.category = Category.PRIMITIVE;
       this.primitiveCategory = primitiveCategory;
+      this.dataTypePhysicalVariation = dataTypePhysicalVariation;
       this.maxLength = maxLength;
       this.isConvert = false;
       this.conversionWritable = null;
@@ -145,6 +152,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
       this.category = category;
       this.objectInspector = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(typeInfo);
       this.primitiveCategory = null;
+      this.dataTypePhysicalVariation = null;
       this.maxLength = 0;
       this.isConvert = false;
       this.conversionWritable = null;
@@ -159,6 +167,10 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
       return primitiveCategory;
     }
 
+    public DataTypePhysicalVariation getDataTypePhysicalVariation() {
+      return dataTypePhysicalVariation;
+    }
+
     public int getMaxLength() {
       return maxLength;
     }
@@ -220,7 +232,8 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
     topLevelFields = new Field[count];
   }
 
-  private Field allocatePrimitiveField(TypeInfo sourceTypeInfo) {
+  private Field allocatePrimitiveField(TypeInfo sourceTypeInfo,
+      DataTypePhysicalVariation dataTypePhysicalVariation) {
     final PrimitiveTypeInfo sourcePrimitiveTypeInfo = (PrimitiveTypeInfo) sourceTypeInfo;
     final PrimitiveCategory sourcePrimitiveCategory = sourcePrimitiveTypeInfo.getPrimitiveCategory();
     final int maxLength;
@@ -236,7 +249,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
       maxLength = 0;
       break;
     }
-    return new Field(sourcePrimitiveCategory, maxLength);
+    return new Field(sourcePrimitiveCategory, dataTypePhysicalVariation, maxLength);
   }
 
   private Field allocateComplexField(TypeInfo sourceTypeInfo) {
@@ -247,7 +260,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
         final ListTypeInfo listTypeInfo = (ListTypeInfo) sourceTypeInfo;
         final ListComplexTypeHelper listHelper =
             new ListComplexTypeHelper(
-                allocateField(listTypeInfo.getListElementTypeInfo()));
+                allocateField(listTypeInfo.getListElementTypeInfo(), DataTypePhysicalVariation.NONE));
         return new Field(category, listHelper, sourceTypeInfo);
       }
     case MAP:
@@ -255,8 +268,8 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
         final MapTypeInfo mapTypeInfo = (MapTypeInfo) sourceTypeInfo;
         final MapComplexTypeHelper mapHelper =
             new MapComplexTypeHelper(
-                allocateField(mapTypeInfo.getMapKeyTypeInfo()),
-                allocateField(mapTypeInfo.getMapValueTypeInfo()));
+                allocateField(mapTypeInfo.getMapKeyTypeInfo(), DataTypePhysicalVariation.NONE),
+                allocateField(mapTypeInfo.getMapValueTypeInfo(), DataTypePhysicalVariation.NONE));
         return new Field(category, mapHelper, sourceTypeInfo);
       }
     case STRUCT:
@@ -266,7 +279,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
         final int count = fieldTypeInfoList.size();
         final Field[] fields = new Field[count];
         for (int i = 0; i < count; i++) {
-          fields[i] = allocateField(fieldTypeInfoList.get(i));
+          fields[i] = allocateField(fieldTypeInfoList.get(i), DataTypePhysicalVariation.NONE);
         }
         final StructComplexTypeHelper structHelper =
             new StructComplexTypeHelper(fields);
@@ -279,7 +292,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
         final int count = fieldTypeInfoList.size();
         final Field[] fields = new Field[count];
         for (int i = 0; i < count; i++) {
-          fields[i] = allocateField(fieldTypeInfoList.get(i));
+          fields[i] = allocateField(fieldTypeInfoList.get(i), DataTypePhysicalVariation.NONE);
         }
         final UnionComplexTypeHelper unionHelper =
             new UnionComplexTypeHelper(fields);
@@ -290,10 +303,10 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
     }
   }
 
-  private Field allocateField(TypeInfo sourceTypeInfo) {
+  private Field allocateField(TypeInfo sourceTypeInfo, DataTypePhysicalVariation dataTypePhysicalVariation) {
     switch (sourceTypeInfo.getCategory()) {
     case PRIMITIVE:
-      return allocatePrimitiveField(sourceTypeInfo);
+      return allocatePrimitiveField(sourceTypeInfo, dataTypePhysicalVariation);
     case LIST:
     case MAP:
     case STRUCT:
@@ -307,11 +320,12 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
   /*
    * Initialize one column's source deserializtion information.
    */
-  private void initTopLevelField(int logicalColumnIndex, int projectionColumnNum, TypeInfo sourceTypeInfo) {
+  private void initTopLevelField(int logicalColumnIndex, int projectionColumnNum,
+      TypeInfo sourceTypeInfo, DataTypePhysicalVariation dataTypePhysicalVariation) {
 
     projectionColumnNums[logicalColumnIndex] = projectionColumnNum;
 
-    topLevelFields[logicalColumnIndex] = allocateField(sourceTypeInfo);
+    topLevelFields[logicalColumnIndex] = allocateField(sourceTypeInfo, dataTypePhysicalVariation);
   }
 
   /*
@@ -339,7 +353,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
 
     for (int i = 0; i < count; i++) {
       int outputColumn = outputColumns[i];
-      initTopLevelField(i, outputColumn, sourceTypeInfos[i]);
+      initTopLevelField(i, outputColumn, sourceTypeInfos[i], dataTypePhysicalVariations[i]);
     }
   }
 
@@ -353,7 +367,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
 
     for (int i = 0; i < count; i++) {
       int outputColumn = outputColumns.get(i);
-      initTopLevelField(i, outputColumn, sourceTypeInfos[i]);
+      initTopLevelField(i, outputColumn, sourceTypeInfos[i], dataTypePhysicalVariations[i]);
     }
   }
 
@@ -367,7 +381,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
 
     for (int i = 0; i < count; i++) {
       int outputColumn = startColumn + i;
-      initTopLevelField(i, outputColumn, sourceTypeInfos[i]);
+      initTopLevelField(i, outputColumn, sourceTypeInfos[i], dataTypePhysicalVariations[i]);
     }
   }
 
@@ -393,7 +407,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
 
       } else {
 
-        initTopLevelField(i, i, sourceTypeInfos[i]);
+        initTopLevelField(i, i, sourceTypeInfos[i], dataTypePhysicalVariations[i]);
         includedIndices[includedCount++] = i;
       }
     }
@@ -452,12 +466,12 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
           if (VectorPartitionConversion.isImplicitVectorColumnConversion(sourceTypeInfo, targetTypeInfo)) {
 
             // Do implicit conversion from source type to target type.
-            initTopLevelField(i, i, sourceTypeInfo);
+            initTopLevelField(i, i, sourceTypeInfo, dataTypePhysicalVariations[i]);
 
           } else {
 
             // Do formal conversion...
-            initTopLevelField(i, i, sourceTypeInfo);
+            initTopLevelField(i, i, sourceTypeInfo, dataTypePhysicalVariations[i]);
 
             // UNDONE: No for List and Map; Yes for Struct and Union when field count different...
             addTopLevelConversion(i);
@@ -467,7 +481,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
         } else {
 
           // No conversion.
-          initTopLevelField(i, i, sourceTypeInfo);
+          initTopLevelField(i, i, sourceTypeInfo, dataTypePhysicalVariations[i]);
 
         }
 
@@ -642,9 +656,13 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
       }
       break;
     case DECIMAL:
-      // The DecimalColumnVector set method will quickly copy the deserialized decimal writable fields.
-      ((DecimalColumnVector) colVector).set(
-          batchIndex, deserializeRead.currentHiveDecimalWritable);
+      if (field.getDataTypePhysicalVariation() == DataTypePhysicalVariation.DECIMAL_64) {
+        ((Decimal64ColumnVector) colVector).vector[batchIndex] = deserializeRead.currentDecimal64;
+      } else {
+        // The DecimalColumnVector set method will quickly copy the deserialized decimal writable fields.
+        ((DecimalColumnVector) colVector).set(
+            batchIndex, deserializeRead.currentHiveDecimalWritable);
+      }
       break;
     case INTERVAL_YEAR_MONTH:
       ((LongColumnVector) colVector).vector[batchIndex] =

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
index a5bdbef..f7e3ff3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
@@ -77,6 +77,8 @@ public class VectorExpressionDescriptor {
     INTERVAL_DAY_TIME       (0x200),
     BINARY                  (0x400),
     STRUCT                  (0x800),
+    DECIMAL_64              (0x1000),
+    INT_DECIMAL_64_FAMILY   (INT_FAMILY.value | DECIMAL_64.value),
     DATETIME_FAMILY         (DATE.value | TIMESTAMP.value),
     INTERVAL_FAMILY         (INTERVAL_YEAR_MONTH.value | INTERVAL_DAY_TIME.value),
     INT_INTERVAL_YEAR_MONTH     (INT_FAMILY.value | INTERVAL_YEAR_MONTH.value),

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
index fba17a8..8f4b9ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
@@ -345,9 +345,15 @@ public class VectorExtractRow {
             return primitiveWritable;
           }
         case DECIMAL:
-          // The HiveDecimalWritable set method will quickly copy the deserialized decimal writable fields.
-          ((HiveDecimalWritable) primitiveWritable).set(
-              ((DecimalColumnVector) colVector).vector[adjustedIndex]);
+          if (colVector instanceof Decimal64ColumnVector) {
+            Decimal64ColumnVector dec32ColVector = (Decimal64ColumnVector) colVector;
+            ((HiveDecimalWritable) primitiveWritable).deserialize64(
+                dec32ColVector.vector[adjustedIndex], dec32ColVector.scale);
+          } else {
+            // The HiveDecimalWritable set method will quickly copy the deserialized decimal writable fields.
+            ((HiveDecimalWritable) primitiveWritable).set(
+                ((DecimalColumnVector) colVector).vector[adjustedIndex]);
+          }
           return primitiveWritable;
         case INTERVAL_YEAR_MONTH:
           ((HiveIntervalYearMonthWritable) primitiveWritable).set(

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
index ff88b85..aba8f4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -31,11 +33,13 @@ import com.google.common.annotations.VisibleForTesting;
 /**
  * File Sink operator implementation.
  **/
-public class VectorFileSinkOperator extends FileSinkOperator {
+public class VectorFileSinkOperator extends FileSinkOperator
+    implements VectorizationOperator {
 
   private static final long serialVersionUID = 1L;
 
   private VectorizationContext vContext;
+  private VectorFileSinkDesc vectorDesc;
 
   // The above members are initialized by the constructor and must not be
   // transient.
@@ -47,11 +51,12 @@ public class VectorFileSinkOperator extends FileSinkOperator {
 
   protected transient Object[] singleRow;
 
-  public VectorFileSinkOperator(CompilationOpContext ctx,
-      VectorizationContext vContext, OperatorDesc conf) {
+  public VectorFileSinkOperator(CompilationOpContext ctx, OperatorDesc conf,
+      VectorizationContext vContext, VectorDesc vectorDesc) {
     this(ctx);
     this.conf = (FileSinkDesc) conf;
     this.vContext = vContext;
+    this.vectorDesc = (VectorFileSinkDesc) vectorDesc;
   }
 
   /** Kryo ctor. */
@@ -65,6 +70,11 @@ public class VectorFileSinkOperator extends FileSinkOperator {
   }
 
   @Override
+  public VectorizationContext getInputVectorizationContext() {
+    return vContext;
+  }
+
+  @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
 
     // We need a input object inspector that is for the row we will extract out of the
@@ -102,4 +112,9 @@ public class VectorFileSinkOperator extends FileSinkOperator {
       }
     }
   }
+
+  @Override
+  public VectorDesc getVectorDesc() {
+    return vectorDesc;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
index fdd5aab..becf4c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
 import org.apache.hadoop.hive.ql.plan.VectorFilterDesc;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -35,11 +36,15 @@ import com.google.common.annotations.VisibleForTesting;
 /**
  * Filter operator implementation.
  **/
-public class VectorFilterOperator extends FilterOperator {
+public class VectorFilterOperator extends FilterOperator
+    implements VectorizationOperator{
 
   private static final long serialVersionUID = 1L;
 
-  private VectorExpression conditionEvaluator = null;
+  private VectorizationContext vContext;
+  private VectorFilterDesc vectorDesc;
+
+  private VectorExpression predicateExpression = null;
 
   // Temporary selected vector
   private transient int[] temporarySelected;
@@ -48,11 +53,14 @@ public class VectorFilterOperator extends FilterOperator {
   // and 0 if condition needs to be computed.
   transient private int filterMode = 0;
 
-  public VectorFilterOperator(CompilationOpContext ctx,
-      VectorizationContext vContext, OperatorDesc conf) throws HiveException {
+  public VectorFilterOperator(CompilationOpContext ctx, OperatorDesc conf,
+      VectorizationContext vContext, VectorDesc vectorDesc)
+          throws HiveException {
     this(ctx);
     this.conf = (FilterDesc) conf;
-    conditionEvaluator = ((VectorFilterDesc) this.conf.getVectorDesc()).getPredicateExpression();
+    this.vContext = vContext;
+    this.vectorDesc = (VectorFilterDesc) vectorDesc;
+    predicateExpression = this.vectorDesc.getPredicateExpression();
   }
 
   /** Kryo ctor. */
@@ -65,20 +73,25 @@ public class VectorFilterOperator extends FilterOperator {
     super(ctx);
   }
 
+  @Override
+  public VectorizationContext getInputVectorizationContext() {
+    return vContext;
+  }
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);
+    VectorExpression.doTransientInit(predicateExpression);
     try {
       heartbeatInterval = HiveConf.getIntVar(hconf,
           HiveConf.ConfVars.HIVESENDHEARTBEAT);
 
-      conditionEvaluator.init(hconf);
+      predicateExpression.init(hconf);
     } catch (Throwable e) {
       throw new HiveException(e);
     }
-    if (conditionEvaluator instanceof ConstantVectorExpression) {
-      ConstantVectorExpression cve = (ConstantVectorExpression) this.conditionEvaluator;
+    if (predicateExpression instanceof ConstantVectorExpression) {
+      ConstantVectorExpression cve = (ConstantVectorExpression) this.predicateExpression;
       if (cve.getLongValue() == 1) {
         filterMode = 1;
       } else {
@@ -90,7 +103,7 @@ public class VectorFilterOperator extends FilterOperator {
   }
 
   public void setFilterCondition(VectorExpression expr) {
-    this.conditionEvaluator = expr;
+    this.predicateExpression = expr;
   }
 
   @Override
@@ -109,7 +122,7 @@ public class VectorFilterOperator extends FilterOperator {
     //Evaluate the predicate expression
     switch (filterMode) {
       case 0:
-        conditionEvaluator.evaluate(vrg);
+        predicateExpression.evaluate(vrg);
         break;
       case -1:
         // All will be filtered out
@@ -133,11 +146,12 @@ public class VectorFilterOperator extends FilterOperator {
     return "FIL";
   }
 
-  public VectorExpression getConditionEvaluator() {
-    return conditionEvaluator;
+  public VectorExpression getPredicateExpression() {
+    return predicateExpression;
   }
 
-  public void setConditionEvaluator(VectorExpression conditionEvaluator) {
-    this.conditionEvaluator = conditionEvaluator;
+  @Override
+  public VectorDesc getVectorDesc() {
+    return vectorDesc;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index d81cd26..90145e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.vector;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.lang.ref.SoftReference;
+import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -30,6 +31,7 @@ import java.util.Map;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
@@ -41,15 +43,20 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
 import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,12 +71,13 @@ import com.google.common.base.Preconditions;
  * stores the aggregate operators' intermediate states. Emits row mode output.
  *
  */
-public class VectorGroupByOperator extends Operator<GroupByDesc> implements
-    VectorizationContextRegion {
+public class VectorGroupByOperator extends Operator<GroupByDesc>
+    implements VectorizationOperator, VectorizationContextRegion {
 
   private static final Logger LOG = LoggerFactory.getLogger(
       VectorGroupByOperator.class.getName());
 
+  private VectorizationContext vContext;
   private VectorGroupByDesc vectorDesc;
 
   /**
@@ -77,7 +85,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
    * the algorithm of how to compute the aggregation. state is kept in the
    * aggregation buffers and is our responsibility to match the proper state for each key.
    */
-  private VectorAggregateExpression[] aggregators;
+  private VectorAggregationDesc[] vecAggrDescs;
 
   /**
    * Key vector expressions.
@@ -85,7 +93,8 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
   private VectorExpression[] keyExpressions;
   private int outputKeyLength;
 
-  private boolean isVectorOutput;
+  private TypeInfo[] outputTypeInfos;
+  private DataTypePhysicalVariation[] outputDataTypePhysicalVariations;
 
   // Create a new outgoing vectorization context because column name map will change.
   private VectorizationContext vOutContext = null;
@@ -94,8 +103,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
   // transient.
   //---------------------------------------------------------------------------
 
-  private transient VectorExpressionWriter[] keyOutputWriters;
-
+  private transient VectorAggregateExpression[] aggregators;
   /**
    * The aggregation buffers to use for the current batch.
    */
@@ -112,8 +120,6 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
   private transient VectorizedRowBatch outputBatch;
   private transient VectorizedRowBatchCtx vrbCtx;
 
-  private transient VectorAssignRow vectorAssignRow;
-
   /*
    * Grouping sets members.
    */
@@ -865,18 +871,42 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
 
   private static final long serialVersionUID = 1L;
 
-  public VectorGroupByOperator(CompilationOpContext ctx,
-      VectorizationContext vContext, OperatorDesc conf) throws HiveException {
+  public VectorGroupByOperator(CompilationOpContext ctx, OperatorDesc conf,
+      VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException {
     this(ctx);
     GroupByDesc desc = (GroupByDesc) conf;
     this.conf = desc;
-    vectorDesc = (VectorGroupByDesc) desc.getVectorDesc();
-    keyExpressions = vectorDesc.getKeyExpressions();
-    aggregators = vectorDesc.getAggregators();
-    isVectorOutput = vectorDesc.isVectorOutput();
+    this.vContext = vContext;
+    this.vectorDesc = (VectorGroupByDesc) vectorDesc;
+    keyExpressions = this.vectorDesc.getKeyExpressions();
+    vecAggrDescs = this.vectorDesc.getVecAggrDescs();
+
+    // Grouping id should be pruned, which is the last of key columns
+    // see ColumnPrunerGroupByProc
+    outputKeyLength =
+        this.conf.pruneGroupingSetId() ? keyExpressions.length - 1 : keyExpressions.length;
+
+    final int aggregationCount = vecAggrDescs.length;
+    final int outputCount = outputKeyLength + aggregationCount;
+
+    outputTypeInfos = new TypeInfo[outputCount];
+    outputDataTypePhysicalVariations = new DataTypePhysicalVariation[outputCount];
+    for (int i = 0; i < outputKeyLength; i++) {
+      VectorExpression keyExpression = keyExpressions[i];
+      outputTypeInfos[i] = keyExpression.getOutputTypeInfo();
+      outputDataTypePhysicalVariations[i] = keyExpression.getOutputDataTypePhysicalVariation();
+    }
+    for (int i = 0; i < aggregationCount; i++) {
+      VectorAggregationDesc vecAggrDesc = vecAggrDescs[i];
+      outputTypeInfos[i + outputKeyLength] = vecAggrDesc.getOutputTypeInfo();
+      outputDataTypePhysicalVariations[i + outputKeyLength] =
+          vecAggrDesc.getOutputDataTypePhysicalVariation();
+    }
 
     vOutContext = new VectorizationContext(getName(), desc.getOutputColumnNames(),
         /* vContextEnvironment */ vContext);
+    vOutContext.setInitialTypeInfos(Arrays.asList(outputTypeInfos));
+    vOutContext.setInitialDataTypePhysicalVariations(Arrays.asList(outputDataTypePhysicalVariations));
   }
 
   /** Kryo ctor. */
@@ -889,6 +919,11 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
     super(ctx);
   }
 
+  @Override
+  public VectorizationContext getInputVectorizationContext() {
+    return vContext;
+  }
+
   private void setupGroupingSets() {
 
     groupingSetsPresent = conf.isGroupingSetsPresent();
@@ -936,6 +971,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);
+    VectorExpression.doTransientInit(keyExpressions);
 
     List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
 
@@ -943,23 +979,43 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
     try {
 
       List<String> outputFieldNames = conf.getOutputColumnNames();
-
-      // grouping id should be pruned, which is the last of key columns
-      // see ColumnPrunerGroupByProc
-      outputKeyLength =
-          conf.pruneGroupingSetId() ? keyExpressions.length - 1 : keyExpressions.length;
-
-      keyOutputWriters = new VectorExpressionWriter[outputKeyLength];
+      final int outputCount = outputFieldNames.size();
 
       for(int i = 0; i < outputKeyLength; ++i) {
-        keyOutputWriters[i] = VectorExpressionWriterFactory.
+        VectorExpressionWriter vew = VectorExpressionWriterFactory.
             genVectorExpressionWritable(keysDesc.get(i));
-        objectInspectors.add(keyOutputWriters[i].getObjectInspector());
+        ObjectInspector oi = vew.getObjectInspector();
+        objectInspectors.add(oi);
       }
 
-      for (int i = 0; i < aggregators.length; ++i) {
-        aggregators[i].init(conf.getAggregators().get(i));
-        ObjectInspector objInsp = aggregators[i].getOutputObjectInspector();
+      final int aggregateCount = vecAggrDescs.length;
+      aggregators = new VectorAggregateExpression[aggregateCount];
+      for (int i = 0; i < aggregateCount; ++i) {
+        VectorAggregationDesc vecAggrDesc = vecAggrDescs[i];
+
+        Class<? extends VectorAggregateExpression> vecAggrClass = vecAggrDesc.getVecAggrClass();
+
+        Constructor<? extends VectorAggregateExpression> ctor = null;
+        try {
+          ctor = vecAggrClass.getConstructor(VectorAggregationDesc.class);
+        } catch (Exception e) {
+          throw new HiveException("Constructor " + vecAggrClass.getSimpleName() +
+              "(VectorAggregationDesc) not available");
+        }
+        VectorAggregateExpression vecAggrExpr = null;
+        try {
+          vecAggrExpr = ctor.newInstance(vecAggrDesc);
+        } catch (Exception e) {
+
+           throw new HiveException("Failed to create " + vecAggrClass.getSimpleName() +
+               "(VectorAggregationDesc) object ", e);
+        }
+        VectorExpression.doTransientInit(vecAggrExpr.getInputExpression());
+        aggregators[i] = vecAggrExpr;
+
+        ObjectInspector objInsp =
+            TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(
+                vecAggrDesc.getOutputTypeInfo());
         Preconditions.checkState(objInsp != null);
         objectInspectors.add(objInsp);
       }
@@ -968,16 +1024,21 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
       aggregationBatchInfo = new VectorAggregationBufferBatch();
       aggregationBatchInfo.compileAggregationBatchInfo(aggregators);
 
-      LOG.info("VectorGroupByOperator is vector output {}", isVectorOutput);
       outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
           outputFieldNames, objectInspectors);
-      if (isVectorOutput) {
-        vrbCtx = new VectorizedRowBatchCtx();
-        vrbCtx.init((StructObjectInspector) outputObjInspector, vOutContext.getScratchColumnTypeNames());
-        outputBatch = vrbCtx.createVectorizedRowBatch();
-        vectorAssignRow = new VectorAssignRow();
-        vectorAssignRow.init((StructObjectInspector) outputObjInspector, vOutContext.getProjectedColumns());
-      }
+
+      vrbCtx = new VectorizedRowBatchCtx(
+          outputFieldNames.toArray(new String[0]),
+          outputTypeInfos,
+          outputDataTypePhysicalVariations,
+          /* dataColumnNums */ null,
+          /* partitionColumnCount */ 0,
+          /* virtualColumnCount */ 0,
+          /* neededVirtualColumns */ null,
+          vOutContext.getScratchColumnTypeNames(),
+          vOutContext.getScratchDataTypePhysicalVariations());
+
+      outputBatch = vrbCtx.createVectorizedRowBatch();
 
     } catch (HiveException he) {
       throw he;
@@ -1064,31 +1125,21 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
    */
   private void writeSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg)
       throws HiveException {
-    int fi = 0;
-    if (!isVectorOutput) {
-      // Output row.
-      for (int i = 0; i < outputKeyLength; ++i) {
-        forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue (
-            kw, i, keyOutputWriters[i]);
-      }
-      for (int i = 0; i < aggregators.length; ++i) {
-        forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i));
-      }
-      forward(forwardCache, outputObjInspector, false);
-    } else {
-      // Output keys and aggregates into the output batch.
-      for (int i = 0; i < outputKeyLength; ++i) {
-        vectorAssignRow.assignRowColumn(outputBatch, outputBatch.size, fi++,
-                keyWrappersBatch.getWritableKeyValue (kw, i, keyOutputWriters[i]));
-      }
-      for (int i = 0; i < aggregators.length; ++i) {
-        vectorAssignRow.assignRowColumn(outputBatch, outputBatch.size, fi++,
-                aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)));
-      }
-      ++outputBatch.size;
-      if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
-        flushOutput();
-      }
+
+    int colNum = 0;
+    final int batchIndex = outputBatch.size;
+
+    // Output keys and aggregates into the output batch.
+    for (int i = 0; i < outputKeyLength; ++i) {
+      keyWrappersBatch.assignRowColumn(outputBatch, batchIndex, colNum++, kw);
+    }
+    for (int i = 0; i < aggregators.length; ++i) {
+      aggregators[i].assignRowColumn(outputBatch, batchIndex, colNum++,
+          agg.getAggregationBuffer(i));
+    }
+    ++outputBatch.size;
+    if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
+      flushOutput();
     }
   }
 
@@ -1101,10 +1152,12 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
    */
   private void writeGroupRow(VectorAggregationBufferRow agg, DataOutputBuffer buffer)
       throws HiveException {
-    int fi = outputKeyLength;   // Start after group keys.
+    int colNum = outputKeyLength;   // Start after group keys.
+    final int batchIndex = outputBatch.size;
+
     for (int i = 0; i < aggregators.length; ++i) {
-      vectorAssignRow.assignRowColumn(outputBatch, outputBatch.size, fi++,
-              aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)));
+      aggregators[i].assignRowColumn(outputBatch, batchIndex, colNum++,
+          agg.getAggregationBuffer(i));
     }
     ++outputBatch.size;
     if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
@@ -1121,7 +1174,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
   @Override
   public void closeOp(boolean aborted) throws HiveException {
     processingMode.close(aborted);
-    if (!aborted && isVectorOutput && outputBatch.size > 0) {
+    if (!aborted && outputBatch.size > 0) {
       flushOutput();
     }
   }
@@ -1143,7 +1196,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
   }
 
   @Override
-  public VectorizationContext getOuputVectorizationContext() {
+  public VectorizationContext getOutputVectorizationContext() {
     return vOutContext;
   }
 
@@ -1161,4 +1214,8 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
     return "GBY";
   }
 
+  @Override
+  public VectorDesc getVectorDesc() {
+    return vectorDesc;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java
index 64706ad..13a929b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java
@@ -45,12 +45,12 @@ public class VectorGroupKeyHelper extends VectorColumnSetInfo {
 
     // Inspect the output type of each key expression.  And, remember the output columns.
     outputColumnNums = new int[keyCount];
-    for(int i=0; i < keyCount; ++i) {
-      String typeName = VectorizationContext.mapTypeNameSynonyms(keyExpressions[i].getOutputType());
-      TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName);
+    for(int i = 0; i < keyCount; ++i) {
+      VectorExpression keyExpression = keyExpressions[i];
+      TypeInfo typeInfo = keyExpression.getOutputTypeInfo();
       Type columnVectorType = VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo);
       addKey(columnVectorType);
-      outputColumnNums[i] = keyExpressions[i].getOutputColumn();
+      outputColumnNums[i] = keyExpression.getOutputColumnNum();
     }
     finishAdding();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java
index f00ad96..74b9c58 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java
@@ -106,7 +106,7 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
     int columnIndex;
     for(int i = 0; i< longIndices.length; ++i) {
       keyIndex = longIndices[i];
-      columnIndex = keyExpressions[keyIndex].getOutputColumn();
+      columnIndex = keyExpressions[keyIndex].getOutputColumnNum();
       LongColumnVector columnVector = (LongColumnVector) batch.cols[columnIndex];
       if (columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
         assignLongNoNullsNoRepeatingNoSelection(i, batch.size, columnVector);
@@ -128,7 +128,7 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
     }
     for(int i=0;i<doubleIndices.length; ++i) {
       keyIndex = doubleIndices[i];
-      columnIndex = keyExpressions[keyIndex].getOutputColumn();
+      columnIndex = keyExpressions[keyIndex].getOutputColumnNum();
       DoubleColumnVector columnVector = (DoubleColumnVector) batch.cols[columnIndex];
       if (columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
         assignDoubleNoNullsNoRepeatingNoSelection(i, batch.size, columnVector);
@@ -150,7 +150,7 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
     }
     for(int i=0;i<stringIndices.length; ++i) {
       keyIndex = stringIndices[i];
-      columnIndex = keyExpressions[keyIndex].getOutputColumn();
+      columnIndex = keyExpressions[keyIndex].getOutputColumnNum();
       BytesColumnVector columnVector = (BytesColumnVector) batch.cols[columnIndex];
       if (columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
         assignStringNoNullsNoRepeatingNoSelection(i, batch.size, columnVector);
@@ -172,7 +172,7 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
     }
     for(int i=0;i<decimalIndices.length; ++i) {
       keyIndex = decimalIndices[i];
-      columnIndex = keyExpressions[keyIndex].getOutputColumn();
+      columnIndex = keyExpressions[keyIndex].getOutputColumnNum();
       DecimalColumnVector columnVector = (DecimalColumnVector) batch.cols[columnIndex];
       if (columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
         assignDecimalNoNullsNoRepeatingNoSelection(i, batch.size, columnVector);
@@ -194,7 +194,7 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
     }
     for(int i=0;i<timestampIndices.length; ++i) {
       keyIndex = timestampIndices[i];
-      columnIndex = keyExpressions[keyIndex].getOutputColumn();
+      columnIndex = keyExpressions[keyIndex].getOutputColumnNum();
       TimestampColumnVector columnVector = (TimestampColumnVector) batch.cols[columnIndex];
       if (columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
         assignTimestampNoNullsNoRepeatingNoSelection(i, batch.size, columnVector);
@@ -216,7 +216,7 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
     }
     for(int i=0;i<intervalDayTimeIndices.length; ++i) {
       keyIndex = intervalDayTimeIndices[i];
-      columnIndex = keyExpressions[keyIndex].getOutputColumn();
+      columnIndex = keyExpressions[keyIndex].getOutputColumnNum();
       IntervalDayTimeColumnVector columnVector = (IntervalDayTimeColumnVector) batch.cols[columnIndex];
       if (columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
         assignIntervalDayTimeNoNullsNoRepeatingNoSelection(i, batch.size, columnVector);
@@ -258,7 +258,7 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
         }
         continue;
       }
-      columnIndex = keyExpressions[keyIndex].getOutputColumn();
+      columnIndex = keyExpressions[keyIndex].getOutputColumnNum();
       LongColumnVector columnVector = (LongColumnVector) batch.cols[columnIndex];
       if (columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
         assignLongNoNullsNoRepeatingNoSelection(i, batch.size, columnVector);
@@ -287,7 +287,7 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
         }
         continue;
       }
-      columnIndex = keyExpressions[keyIndex].getOutputColumn();
+      columnIndex = keyExpressions[keyIndex].getOutputColumnNum();
       DoubleColumnVector columnVector = (DoubleColumnVector) batch.cols[columnIndex];
       if (columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
         assignDoubleNoNullsNoRepeatingNoSelection(i, batch.size, columnVector);
@@ -316,7 +316,7 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
         }
         continue;
       }
-      columnIndex = keyExpressions[keyIndex].getOutputColumn();
+      columnIndex = keyExpressions[keyIndex].getOutputColumnNum();
       BytesColumnVector columnVector = (BytesColumnVector) batch.cols[columnIndex];
       if (columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
         assignStringNoNullsNoRepeatingNoSelection(i, batch.size, columnVector);
@@ -345,7 +345,7 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
         }
         continue;
       }
-      columnIndex = keyExpressions[keyIndex].getOutputColumn();
+      columnIndex = keyExpressions[keyIndex].getOutputColumnNum();
       DecimalColumnVector columnVector = (DecimalColumnVector) batch.cols[columnIndex];
       if (columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
         assignDecimalNoNullsNoRepeatingNoSelection(i, batch.size, columnVector);
@@ -374,7 +374,7 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
         }
         continue;
       }
-      columnIndex = keyExpressions[keyIndex].getOutputColumn();
+      columnIndex = keyExpressions[keyIndex].getOutputColumnNum();
       TimestampColumnVector columnVector = (TimestampColumnVector) batch.cols[columnIndex];
       if (columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
         assignTimestampNoNullsNoRepeatingNoSelection(i, batch.size, columnVector);
@@ -403,7 +403,7 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
         }
         continue;
       }
-      columnIndex = keyExpressions[keyIndex].getOutputColumn();
+      columnIndex = keyExpressions[keyIndex].getOutputColumnNum();
       IntervalDayTimeColumnVector columnVector = (IntervalDayTimeColumnVector) batch.cols[columnIndex];
       if (columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
         assignIntervalDayTimeNoNullsNoRepeatingNoSelection(i, batch.size, columnVector);
@@ -910,9 +910,7 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
     final int size = keyExpressions.length;
     ColumnVector.Type[] columnVectorTypes = new ColumnVector.Type[size];
     for (int i = 0; i < size; i++) {
-      String typeName = VectorizationContext.mapTypeNameSynonyms(keyExpressions[i].getOutputType());
-      TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName);
-      columnVectorTypes[i] = VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo);
+      columnVectorTypes[i] = keyExpressions[i].getOutputColumnVectorType();
     }
     return compileKeyWrapperBatch(keyExpressions, columnVectorTypes);
   }
@@ -1035,6 +1033,56 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
     kw.assignLong(columnTypeSpecificIndex, value);
   }
 
+  public void assignRowColumn(VectorizedRowBatch batch, int batchIndex, int keyIndex,
+      VectorHashKeyWrapper kw)
+    throws HiveException {
+
+    ColumnVector colVector = batch.cols[keyIndex];
+
+    if (kw.isNull(keyIndex)) {
+      colVector.noNulls = false;
+      colVector.isNull[batchIndex] = true;
+      return;
+    }
+    colVector.isNull[batchIndex] = false;
+
+    ColumnVector.Type columnVectorType = columnVectorTypes[keyIndex];
+    int columnTypeSpecificIndex = columnTypeSpecificIndices[keyIndex];
+
+    switch (columnVectorType) {
+    case LONG:
+    case DECIMAL_64:
+      ((LongColumnVector) colVector).vector[batchIndex] =
+          kw.getLongValue(columnTypeSpecificIndex);
+      break;
+    case DOUBLE:
+      ((DoubleColumnVector) colVector).vector[batchIndex] =
+          kw.getDoubleValue(columnTypeSpecificIndex);
+      break;
+    case BYTES:
+      ((BytesColumnVector) colVector).setVal(
+          batchIndex,
+          kw.getBytes(columnTypeSpecificIndex),
+          kw.getByteStart(columnTypeSpecificIndex),
+          kw.getByteLength(columnTypeSpecificIndex));
+      break;
+    case DECIMAL:
+      ((DecimalColumnVector) colVector).vector[batchIndex].set(
+          kw.getDecimal(columnTypeSpecificIndex));
+      break;
+    case TIMESTAMP:
+      ((TimestampColumnVector) colVector).set(
+          batchIndex, kw.getTimestamp(columnTypeSpecificIndex));
+      break;
+    case INTERVAL_DAY_TIME:
+      ((IntervalDayTimeColumnVector) colVector).set(
+          batchIndex, kw.getIntervalDayTime(columnTypeSpecificIndex));
+      break;
+    default:
+      throw new HiveException("Unexpected column vector type " + columnVectorType);
+    }
+  }
+
   public int getVariableSize(int batchSize) {
     int variableSize = 0;
     if ( 0 < stringIndices.length) {

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java
index b37dd05..c42be24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java
@@ -23,16 +23,21 @@ import org.apache.hadoop.hive.ql.exec.LimitOperator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.LimitDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorLimitDesc;
 
 import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Limit operator implementation Limits the number of rows to be passed on.
  **/
-public class VectorLimitOperator extends LimitOperator  {
+public class VectorLimitOperator extends LimitOperator implements VectorizationOperator {
 
   private static final long serialVersionUID = 1L;
 
+  private VectorizationContext vContext;
+  private VectorLimitDesc vectorDesc;
+
   /** Kryo ctor. */
   @VisibleForTesting
   public VectorLimitOperator() {
@@ -44,9 +49,17 @@ public class VectorLimitOperator extends LimitOperator  {
   }
 
   public VectorLimitOperator(
-      CompilationOpContext ctx, VectorizationContext vContext, OperatorDesc conf) {
+      CompilationOpContext ctx, OperatorDesc conf,
+      VectorizationContext vContext, VectorDesc vectorDesc) {
     this(ctx);
     this.conf = (LimitDesc) conf;
+    this.vContext = vContext;
+    this.vectorDesc = (VectorLimitDesc) vectorDesc;
+  }
+
+  @Override
+  public VectorizationContext getInputVectorizationContext() {
+    return vContext;
   }
 
   @Override
@@ -79,4 +92,9 @@ public class VectorLimitOperator extends LimitOperator  {
       currCount += batch.size;
     }
   }
+
+  @Override
+  public VectorDesc getVectorDesc() {
+    return vectorDesc;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
index b2c8684..9eb660f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
@@ -32,10 +34,14 @@ import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.ObjectContainer;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.DataOutputBuffer;
 
 /**
@@ -44,10 +50,14 @@ import org.apache.hadoop.io.DataOutputBuffer;
  *
  * It has common variables and code for the output batch, Hybrid Grace spill batch, and more.
  */
-public class VectorMapJoinBaseOperator extends MapJoinOperator implements VectorizationContextRegion {
+public class VectorMapJoinBaseOperator extends MapJoinOperator
+    implements VectorizationOperator, VectorizationContextRegion {
 
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinBaseOperator.class.getName());
 
+  protected VectorizationContext vContext;
+  protected VectorMapJoinDesc vectorDesc;
+
   private static final long serialVersionUID = 1L;
 
   protected VectorizationContext vOutContext;
@@ -74,12 +84,14 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
     super(ctx);
   }
 
-  public VectorMapJoinBaseOperator(CompilationOpContext ctx,
-      VectorizationContext vContext, OperatorDesc conf) throws HiveException {
+  public VectorMapJoinBaseOperator(CompilationOpContext ctx, OperatorDesc conf,
+      VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException {
     super(ctx);
 
     MapJoinDesc desc = (MapJoinDesc) conf;
     this.conf = desc;
+    this.vContext = vContext;
+    this.vectorDesc = (VectorMapJoinDesc) vectorDesc;
 
     order = desc.getTagOrder();
     numAliases = desc.getExprs().size();
@@ -90,6 +102,118 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
      // We are making a new output vectorized row batch.
     vOutContext = new VectorizationContext(getName(), desc.getOutputColumnNames(),
         /* vContextEnvironment */ vContext);
+    vOutContext.setInitialTypeInfos(Arrays.asList(getOutputTypeInfos(desc)));
+  }
+
+  @Override
+  public VectorizationContext getInputVectorizationContext() {
+    return vContext;
+  }
+
+  public static TypeInfo[] getOutputTypeInfos(MapJoinDesc desc) {
+
+    final byte posBigTable = (byte) desc.getPosBigTable();
+
+    List<ExprNodeDesc> keyDesc = desc.getKeys().get(posBigTable);
+    List<ExprNodeDesc> bigTableExprs = desc.getExprs().get(posBigTable);
+
+    Byte[] order = desc.getTagOrder();
+    Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]);
+
+    final int outputColumnCount = desc.getOutputColumnNames().size();
+    TypeInfo[] outputTypeInfos = new TypeInfo[outputColumnCount];
+
+    /*
+     * Gather up big and small table output result information from the MapJoinDesc.
+     */
+    List<Integer> bigTableRetainList = desc.getRetainList().get(posBigTable);
+    final int bigTableRetainSize = bigTableRetainList.size();
+
+    int[] smallTableIndices;
+    int smallTableIndicesSize;
+    List<ExprNodeDesc> smallTableExprs = desc.getExprs().get(posSingleVectorMapJoinSmallTable);
+    if (desc.getValueIndices() != null && desc.getValueIndices().get(posSingleVectorMapJoinSmallTable) != null) {
+      smallTableIndices = desc.getValueIndices().get(posSingleVectorMapJoinSmallTable);
+      smallTableIndicesSize = smallTableIndices.length;
+    } else {
+      smallTableIndices = null;
+      smallTableIndicesSize = 0;
+    }
+
+    List<Integer> smallTableRetainList = desc.getRetainList().get(posSingleVectorMapJoinSmallTable);
+    final int smallTableRetainSize = smallTableRetainList.size();
+
+    int smallTableResultSize = 0;
+    if (smallTableIndicesSize > 0) {
+      smallTableResultSize = smallTableIndicesSize;
+    } else if (smallTableRetainSize > 0) {
+      smallTableResultSize = smallTableRetainSize;
+    }
+
+    /*
+     * Determine the big table retained mapping first so we can optimize out (with
+     * projection) copying inner join big table keys in the subsequent small table results section.
+     */
+
+    int nextOutputColumn = (order[0] == posBigTable ? 0 : smallTableResultSize);
+    for (int i = 0; i < bigTableRetainSize; i++) {
+
+      TypeInfo typeInfo = bigTableExprs.get(i).getTypeInfo();
+      outputTypeInfos[nextOutputColumn] = typeInfo;
+
+      nextOutputColumn++;
+    }
+
+    /*
+     * Now determine the small table results.
+     */
+    int firstSmallTableOutputColumn;
+    firstSmallTableOutputColumn = (order[0] == posBigTable ? bigTableRetainSize : 0);
+    int smallTableOutputCount = 0;
+    nextOutputColumn = firstSmallTableOutputColumn;
+
+    // Small table indices has more information (i.e. keys) than retain, so use it if it exists...
+    if (smallTableIndicesSize > 0) {
+      smallTableOutputCount = smallTableIndicesSize;
+
+      for (int i = 0; i < smallTableIndicesSize; i++) {
+        if (smallTableIndices[i] >= 0) {
+
+          // Zero and above numbers indicate a big table key is needed for
+          // small table result "area".
+
+          int keyIndex = smallTableIndices[i];
+
+          TypeInfo typeInfo = keyDesc.get(keyIndex).getTypeInfo();
+          outputTypeInfos[nextOutputColumn] = typeInfo;
+
+        } else {
+
+          // Negative numbers indicate a column to be (deserialize) read from the small table's
+          // LazyBinary value row.
+          int smallTableValueIndex = -smallTableIndices[i] - 1;
+
+          TypeInfo typeInfo = smallTableExprs.get(smallTableValueIndex).getTypeInfo();
+          outputTypeInfos[nextOutputColumn] = typeInfo;
+
+        }
+        nextOutputColumn++;
+      }
+    } else if (smallTableRetainSize > 0) {
+      smallTableOutputCount = smallTableRetainSize;
+
+      // Only small table values appear in join output result.
+
+      for (int i = 0; i < smallTableRetainSize; i++) {
+        int smallTableValueIndex = smallTableRetainList.get(i);
+
+        TypeInfo typeInfo = smallTableExprs.get(smallTableValueIndex).getTypeInfo();
+        outputTypeInfos[nextOutputColumn] = typeInfo;
+
+        nextOutputColumn++;
+      }
+    }
+    return outputTypeInfos;
   }
 
   @Override
@@ -97,7 +221,8 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
     super.initializeOp(hconf);
 
     vrbCtx = new VectorizedRowBatchCtx();
-    vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames());
+    vrbCtx.init((StructObjectInspector) this.outputObjInspector,
+        vOutContext.getScratchColumnTypeNames(), vOutContext.getScratchDataTypePhysicalVariations());
 
     outputBatch = vrbCtx.createVectorizedRowBatch();
 
@@ -182,8 +307,12 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
   }
 
   @Override
-  public VectorizationContext getOuputVectorizationContext() {
+  public VectorizationContext getOutputVectorizationContext() {
     return vOutContext;
   }
 
+  @Override
+  public VectorDesc getVectorDesc() {
+    return vectorDesc;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
index 4e05fa3..b8d7150 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.slf4j.Logger;
@@ -86,10 +87,10 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator {
   }
 
 
-  public VectorMapJoinOperator (CompilationOpContext ctx,
-      VectorizationContext vContext, OperatorDesc conf) throws HiveException {
+  public VectorMapJoinOperator (CompilationOpContext ctx, OperatorDesc conf,
+      VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException {
 
-    super(ctx, vContext, conf);
+    super(ctx, conf, vContext, vectorDesc);
 
     MapJoinDesc desc = (MapJoinDesc) conf;
 
@@ -107,6 +108,10 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator {
 
   @Override
   public void initializeOp(Configuration hconf) throws HiveException {
+    VectorExpression.doTransientInit(bigTableFilterExpressions);
+    VectorExpression.doTransientInit(keyExpressions);
+    VectorExpression.doTransientInit(bigTableValueExpressions);
+
     // Use a final variable to properly parameterize the processVectorInspector closure.
     // Using a member variable in the closure will not do the right thing...
     final int parameterizePosBigTable = conf.getPosBigTable();
@@ -174,7 +179,7 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator {
           int rowIndex = inBatch.selectedInUse ? inBatch.selected[batchIndex] : batchIndex;
           return valueWriters[writerIndex].writeValue(inBatch.cols[columnIndex], rowIndex);
         }
-      }.initVectorExpr(vectorExpr.getOutputColumn(), i);
+      }.initVectorExpr(vectorExpr.getOutputColumnNum(), i);
       vectorNodeEvaluators.add(eval);
     }
     // Now replace the old evaluators with our own

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
index 26ca2b2..b8b4d8f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -36,8 +37,6 @@ public class VectorMapJoinOuterFilteredOperator extends VectorMapJoinBaseOperato
 
   private static final long serialVersionUID = 1L;
 
-  private VectorizationContext vContext;
-
   // The above members are initialized by the constructor and must not be
   // transient.
   //---------------------------------------------------------------------------
@@ -59,11 +58,9 @@ public class VectorMapJoinOuterFilteredOperator extends VectorMapJoinBaseOperato
     super(ctx);
   }
 
-  public VectorMapJoinOuterFilteredOperator(CompilationOpContext ctx,
-      VectorizationContext vContext, OperatorDesc conf) throws HiveException {
-    super(ctx, vContext, conf);
-
-    this.vContext = vContext;
+  public VectorMapJoinOuterFilteredOperator(CompilationOpContext ctx, OperatorDesc conf,
+      VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException {
+    super(ctx, conf, vContext, vectorDesc);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
index 26ab360..649426b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
@@ -281,6 +281,7 @@ public class VectorMapOperator extends AbstractMapOperator {
           LazySimpleDeserializeRead lazySimpleDeserializeRead =
               new LazySimpleDeserializeRead(
                   minimalDataTypeInfos,
+                  batchContext.getRowdataTypePhysicalVariations(),
                   /* useExternalBuffer */ true,
                   simpleSerdeParams);
 
@@ -512,18 +513,8 @@ public class VectorMapOperator extends AbstractMapOperator {
     partitionColumnCount = batchContext.getPartitionColumnCount();
     partitionValues = new Object[partitionColumnCount];
     virtualColumnCount = batchContext.getVirtualColumnCount();
-    rowIdentifierColumnNum = -1;
-    if (virtualColumnCount > 0) {
-      final int firstVirtualColumnNum = dataColumnCount + partitionColumnCount;
-      VirtualColumn[] neededVirtualColumns = batchContext.getNeededVirtualColumns();
-      hasRowIdentifier = (neededVirtualColumns[0] == VirtualColumn.ROWID);
-      if (hasRowIdentifier) {
-        rowIdentifierColumnNum = firstVirtualColumnNum;
-      }
-    } else {
-      hasRowIdentifier = false;
-    }
-    
+    rowIdentifierColumnNum = batchContext.findVirtualColumnNum(VirtualColumn.ROWID);
+    hasRowIdentifier = (rowIdentifierColumnNum != -1);
 
     dataColumnNums = batchContext.getDataColumnNums();
     Preconditions.checkState(dataColumnNums != null);

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
index dd5e20f..60c236c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
@@ -24,15 +24,19 @@ import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
 import com.google.common.annotations.VisibleForTesting;
 
-public class VectorReduceSinkOperator extends ReduceSinkOperator {
+public class VectorReduceSinkOperator extends ReduceSinkOperator
+    implements VectorizationOperator {
 
   private static final long serialVersionUID = 1L;
 
   private VectorizationContext vContext;
+  private VectorReduceSinkDesc vectorDesc;
 
   // The above members are initialized by the constructor and must not be
   // transient.
@@ -45,11 +49,13 @@ public class VectorReduceSinkOperator extends ReduceSinkOperator {
   protected transient Object[] singleRow;
 
   public VectorReduceSinkOperator(CompilationOpContext ctx,
-      VectorizationContext vContext, OperatorDesc conf) throws HiveException {
+      OperatorDesc conf, VectorizationContext vContext, VectorDesc vectorDesc)
+          throws HiveException {
     this(ctx);
     ReduceSinkDesc desc = (ReduceSinkDesc) conf;
     this.conf = desc;
     this.vContext = vContext;
+    this.vectorDesc = (VectorReduceSinkDesc) vectorDesc;
   }
 
   /** Kryo ctor. */
@@ -63,6 +69,11 @@ public class VectorReduceSinkOperator extends ReduceSinkOperator {
   }
 
   @Override
+  public VectorizationContext getInputVectorizationContext() {
+    return vContext;
+  }
+
+  @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
 
     // We need a input object inspector that is for the row we will extract out of the
@@ -105,4 +116,9 @@ public class VectorReduceSinkOperator extends ReduceSinkOperator {
       }
     }
   }
+
+  @Override
+  public VectorDesc getVectorDesc() {
+    return vectorDesc;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
index 0473f14..ef889f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
@@ -34,6 +34,9 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.VectorSMBJoinDesc;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -49,13 +52,17 @@ import com.google.common.annotations.VisibleForTesting;
  * It accepts a vectorized batch input from the big table and iterates over the batch, calling the parent row-mode
  * implementation for each row in the batch.
  */
-public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements VectorizationContextRegion {
+public class VectorSMBMapJoinOperator extends SMBMapJoinOperator
+    implements VectorizationOperator, VectorizationContextRegion {
 
   private static final Logger LOG = LoggerFactory.getLogger(
       VectorSMBMapJoinOperator.class.getName());
 
   private static final long serialVersionUID = 1L;
 
+  private VectorizationContext vContext;
+  private VectorSMBJoinDesc vectorDesc;
+
   private VectorExpression[] bigTableValueExpressions;
 
   private VectorExpression[] bigTableFilterExpressions;
@@ -100,11 +107,13 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
     super(ctx);
   }
 
-  public VectorSMBMapJoinOperator(CompilationOpContext ctx,
-      VectorizationContext vContext, OperatorDesc conf) throws HiveException {
+  public VectorSMBMapJoinOperator(CompilationOpContext ctx, OperatorDesc conf,
+      VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException {
     this(ctx);
     SMBJoinDesc desc = (SMBJoinDesc) conf;
     this.conf = desc;
+    this.vContext = vContext;
+    this.vectorDesc = (VectorSMBJoinDesc) vectorDesc;
 
     order = desc.getTagOrder();
     numAliases = desc.getExprs().size();
@@ -131,6 +140,11 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
   }
 
   @Override
+  public VectorizationContext getInputVectorizationContext() {
+    return vContext;
+  }
+
+  @Override
   protected List<Object> smbJoinComputeKeys(Object row, byte alias) throws HiveException {
     if (alias == this.posBigTable) {
 
@@ -152,6 +166,9 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);
+    VectorExpression.doTransientInit(bigTableFilterExpressions);
+    VectorExpression.doTransientInit(keyExpressions);
+    VectorExpression.doTransientInit(bigTableValueExpressions);
 
     vrbCtx = new VectorizedRowBatchCtx();
     vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames());
@@ -228,7 +245,7 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
           int rowIndex = inBatch.selectedInUse ? inBatch.selected[batchIndex] : batchIndex;
           return valueWriters[writerIndex].writeValue(inBatch.cols[columnIndex], rowIndex);
         }
-      }.initVectorExpr(vectorExpr.getOutputColumn(), i);
+      }.initVectorExpr(vectorExpr.getOutputColumnNum(), i);
       vectorNodeEvaluators.add(eval);
     }
     // Now replace the old evaluators with our own
@@ -312,7 +329,12 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
   }
 
   @Override
-  public VectorizationContext getOuputVectorizationContext() {
+  public VectorizationContext getOutputVectorizationContext() {
     return vOutContext;
   }
+
+  @Override
+  public VectorDesc getVectorDesc() {
+    return vectorDesc;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
index 5f1f952..d603355 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
 import org.apache.hadoop.hive.ql.plan.VectorSelectDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -41,11 +42,12 @@ import com.google.common.annotations.VisibleForTesting;
 /**
  * Select operator implementation.
  */
-public class VectorSelectOperator extends Operator<SelectDesc> implements
-    VectorizationContextRegion {
+public class VectorSelectOperator extends Operator<SelectDesc>
+    implements VectorizationOperator, VectorizationContextRegion {
 
   private static final long serialVersionUID = 1L;
 
+  private VectorizationContext vContext;
   private VectorSelectDesc vectorDesc;
 
   private VectorExpression[] vExpressions = null;
@@ -57,20 +59,24 @@ public class VectorSelectOperator extends Operator<SelectDesc> implements
   // Create a new outgoing vectorization context because column name map will change.
   private VectorizationContext vOutContext;
 
-  public VectorSelectOperator(CompilationOpContext ctx,
-      VectorizationContext vContext, OperatorDesc conf) throws HiveException {
+  public VectorSelectOperator(CompilationOpContext ctx, OperatorDesc conf,
+      VectorizationContext vContext, VectorDesc vectorDesc)
+          throws HiveException {
     this(ctx);
     this.conf = (SelectDesc) conf;
-    vectorDesc = (VectorSelectDesc) this.conf.getVectorDesc();
-    vExpressions = vectorDesc.getSelectExpressions();
-    projectedOutputColumns = vectorDesc.getProjectedOutputColumns();
+    this.vContext = vContext;
+    this.vectorDesc = (VectorSelectDesc) vectorDesc;
+    vExpressions = this.vectorDesc.getSelectExpressions();
+    projectedOutputColumns = this.vectorDesc.getProjectedOutputColumns();
 
     /**
      * Create a new vectorization context to create a new projection, but keep
      * same output column manager must be inherited to track the scratch the columns.
+     * Some of which may be the input columns for this operator.
      */
     vOutContext = new VectorizationContext(getName(), vContext);
 
+    // NOTE: We keep the TypeInfo and dataTypePhysicalVariation arrays.
     vOutContext.resetProjectionColumns();
     List<String> outputColumnNames = this.conf.getOutputColumnNames();
     for (int i=0; i < projectedOutputColumns.length; ++i) {
@@ -90,12 +96,18 @@ public class VectorSelectOperator extends Operator<SelectDesc> implements
   }
 
   @Override
+  public VectorizationContext getInputVectorizationContext() {
+    return vContext;
+  }
+
+  @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);
     // Just forward the row as is
     if (conf.isSelStarNoCompute()) {
       return;
     }
+    VectorExpression.doTransientInit(vExpressions);
 
     List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
 
@@ -166,7 +178,7 @@ public class VectorSelectOperator extends Operator<SelectDesc> implements
   }
 
   @Override
-  public VectorizationContext getOuputVectorizationContext() {
+  public VectorizationContext getOutputVectorizationContext() {
     return vOutContext;
   }
 
@@ -184,4 +196,9 @@ public class VectorSelectOperator extends Operator<SelectDesc> implements
     return "SEL";
   }
 
+  @Override
+  public VectorDesc getVectorDesc() {
+    return vectorDesc;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e63ebccc/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
index 211622d..70f124e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
@@ -376,8 +376,13 @@ public final class VectorSerializeRow<T extends SerializeWrite> {
       break;
     case DECIMAL:
       {
-        final DecimalColumnVector decimalColVector = (DecimalColumnVector) colVector;
-        serializeWrite.writeHiveDecimal(decimalColVector.vector[adjustedBatchIndex], decimalColVector.scale);
+        if (colVector instanceof Decimal64ColumnVector) {
+          final Decimal64ColumnVector decimal64ColVector = (Decimal64ColumnVector) colVector;
+          serializeWrite.writeDecimal64(decimal64ColVector.vector[adjustedBatchIndex], decimal64ColVector.scale);
+        } else {
+          final DecimalColumnVector decimalColVector = (DecimalColumnVector) colVector;
+          serializeWrite.writeHiveDecimal(decimalColVector.vector[adjustedBatchIndex], decimalColVector.scale);
+        }
       }
       break;
     case INTERVAL_YEAR_MONTH:


Mime
View raw message