hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject [46/50] [abbrv] hive git commit: HIVE-16065: Vectorization: Wrong Key/Value information used by Vectorizer (Matt McCline, reviewed by Jason Dere)
Date Tue, 28 Mar 2017 21:10:03 GMT
HIVE-16065: Vectorization: Wrong Key/Value information used by Vectorizer (Matt McCline, reviewed
by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5d63605a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5d63605a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5d63605a

Branch: refs/heads/branch-2.2
Commit: 5d63605a1236cb0bf569c0817ef425740dca49ef
Parents: d2f64ce
Author: Matt McCline <mmccline@hortonworks.com>
Authored: Thu Mar 2 18:40:24 2017 -0800
Committer: Owen O'Malley <omalley@apache.org>
Committed: Tue Mar 28 14:02:48 2017 -0700

----------------------------------------------------------------------
 .../hive/ql/exec/tez/ReduceRecordProcessor.java |  2 +-
 .../hive/ql/exec/tez/ReduceRecordSource.java    |  8 ++-
 .../hive/ql/optimizer/physical/Vectorizer.java  | 74 ++++++++++++++------
 .../apache/hadoop/hive/ql/plan/BaseWork.java    | 10 +++
 .../apache/hadoop/hive/ql/plan/ReduceWork.java  | 31 --------
 5 files changed, 70 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5d63605a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 2d06545..3fb9fb1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -300,7 +300,7 @@ public class ReduceRecordProcessor  extends RecordProcessor{
     boolean vectorizedRecordSource = (tag == bigTablePosition) && redWork.getVectorMode();
     sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc,
         valueTableDesc, reader, tag == bigTablePosition, (byte) tag,
-        redWork.getVectorizedRowBatchCtx());
+        redWork.getVectorizedRowBatchCtx(), redWork.getVectorizedVertexNum());
     ois[tag] = sources[tag].getObjectInspector();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/5d63605a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index 7e41b7a..342e1ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -120,11 +120,14 @@ public class ReduceRecordSource implements RecordSource {
 
   private final GroupIterator groupIterator = new GroupIterator();
 
+  private long vectorizedVertexNum;
+
   void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc,
       TableDesc valueTableDesc, Reader reader, boolean handleGroupKey, byte tag,
-      VectorizedRowBatchCtx batchContext)
+      VectorizedRowBatchCtx batchContext, long vectorizedVertexNum)
       throws Exception {
 
+    this.vectorizedVertexNum = vectorizedVertexNum;
     ObjectInspector keyObjectInspector;
 
     this.reducer = reducer;
@@ -471,7 +474,8 @@ public class ReduceRecordSource implements RecordSource {
             + StringUtils.stringifyException(e2) + " ]";
       }
       throw new HiveException("Hive Runtime Error while processing vector batch (tag="
-          + tag + ") " + rowString, e);
+          + tag + ") (vectorizedVertexNum " + vectorizedVertexNum + ") " +
+          rowString, e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/5d63605a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 864b783..a30923f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -168,6 +168,7 @@ import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.NullStructSerDe;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -182,6 +183,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.base.Preconditions;
 
@@ -242,6 +244,8 @@ public class Vectorizer implements PhysicalPlanResolver {
 
   HiveVectorAdaptorUsageMode hiveVectorAdaptorUsageMode;
 
+  private long vectorizedVertexNum = -1;
+
   public Vectorizer() {
 
     /*
@@ -486,6 +490,7 @@ public class Vectorizer implements PhysicalPlanResolver {
 
     private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException
{
       VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
+      mapWork.setVectorizedVertexNum(++vectorizedVertexNum);
       boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTez);
       if (ret) {
         vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTez);
@@ -935,6 +940,7 @@ public class Vectorizer implements PhysicalPlanResolver {
 
     private void convertReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException
{
       VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
+      reduceWork.setVectorizedVertexNum(++vectorizedVertexNum);
       boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo, isTez);
       if (ret) {
         vectorizeReduceWork(reduceWork, vectorTaskColumnInfo, isTez);
@@ -947,36 +953,56 @@ public class Vectorizer implements PhysicalPlanResolver {
       ArrayList<String> reduceColumnNames = new ArrayList<String>();
       ArrayList<TypeInfo> reduceTypeInfos = new ArrayList<TypeInfo>();
 
+      if (reduceWork.getNeedsTagging()) {
+        LOG.info("Tagging not supported");
+        return false;
+      }
+
       try {
-        // Check key ObjectInspector.
-        ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector();
-        if (keyObjectInspector == null || !(keyObjectInspector instanceof StructObjectInspector))
{
-          return false;
+        TableDesc keyTableDesc = reduceWork.getKeyDesc();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Using reduce tag " + reduceWork.getTag());
         }
-        StructObjectInspector keyStructObjectInspector = (StructObjectInspector)keyObjectInspector;
-        List<? extends StructField> keyFields = keyStructObjectInspector.getAllStructFieldRefs();
-
-        // Tez doesn't use tagging...
-        if (reduceWork.getNeedsTagging()) {
+        TableDesc valueTableDesc = reduceWork.getTagToValueDesc().get(reduceWork.getTag());
+
+        Deserializer keyDeserializer =
+            ReflectionUtils.newInstance(
+                keyTableDesc.getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(keyDeserializer, null, keyTableDesc.getProperties(), null);
+        ObjectInspector keyObjectInspector = keyDeserializer.getObjectInspector();
+        if (keyObjectInspector == null) {
+          LOG.info("Key object inspector null");
           return false;
         }
-
-        // Check value ObjectInspector.
-        ObjectInspector valueObjectInspector = reduceWork.getValueObjectInspector();
-        if (valueObjectInspector == null ||
-                !(valueObjectInspector instanceof StructObjectInspector)) {
+        if (!(keyObjectInspector instanceof StructObjectInspector)) {
+          LOG.info("Key object inspector not StructObjectInspector");
           return false;
         }
-        StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector;
-        List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs();
+        StructObjectInspector keyStructObjectInspector = (StructObjectInspector) keyObjectInspector;
+        List<? extends StructField> keyFields = keyStructObjectInspector.getAllStructFieldRefs();
 
         for (StructField field: keyFields) {
           reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
           reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
         }
-        for (StructField field: valueFields) {
-          reduceColumnNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
-          reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
+
+        Deserializer valueDeserializer =
+            ReflectionUtils.newInstance(
+                valueTableDesc.getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(valueDeserializer, null, valueTableDesc.getProperties(),
null);
+        ObjectInspector valueObjectInspector = valueDeserializer.getObjectInspector();
+        if (valueObjectInspector != null) {
+          if (!(valueObjectInspector instanceof StructObjectInspector)) {
+            LOG.info("Value object inspector not StructObjectInspector");
+            return false;
+          }
+          StructObjectInspector valueStructObjectInspector = (StructObjectInspector) valueObjectInspector;
+          List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs();
+
+          for (StructField field: valueFields) {
+            reduceColumnNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
+            reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
+          }
         }
       } catch (Exception e) {
         throw new SemanticException(e);
@@ -1254,6 +1280,10 @@ public class Vectorizer implements PhysicalPlanResolver {
       if (op instanceof TableScanOperator) {
         if (taskVectorizationContext == null) {
           taskVectorizationContext = getVectorizationContext(op.getName(), vectorTaskColumnInfo);
+          if (LOG.isInfoEnabled()) {
+            LOG.info("MapWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum
+ " mapColumnNames " + vectorTaskColumnInfo.allColumnNames.toString());
+            LOG.info("MapWorkVectorizationNodeProcessor process vectorizedVertexNum " + vectorizedVertexNum
+ " mapTypeInfos " + vectorTaskColumnInfo.allTypeInfos.toString());
+          }
         }
         vContext = taskVectorizationContext;
       } else {
@@ -1320,8 +1350,10 @@ public class Vectorizer implements PhysicalPlanResolver {
       boolean saveRootVectorOp = false;
 
       if (op.getParentOperators().size() == 0) {
-        LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + vectorTaskColumnInfo.allColumnNames.toString());
-
+        if (LOG.isInfoEnabled()) {
+          LOG.info("ReduceWorkVectorizationNodeProcessor process vectorizedVertexNum " +
vectorizedVertexNum + " reduceColumnNames " + vectorTaskColumnInfo.allColumnNames.toString());
+          LOG.info("ReduceWorkVectorizationNodeProcessor process vectorizedVertexNum " +
vectorizedVertexNum + " reduceTypeInfos " + vectorTaskColumnInfo.allTypeInfos.toString());
+        }
         vContext = new VectorizationContext("__Reduce_Shuffle__", vectorTaskColumnInfo.allColumnNames,
hiveConf);
         taskVectorizationContext = vContext;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/5d63605a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
index 8c341fc..990da12 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
@@ -73,6 +73,8 @@ public abstract class BaseWork extends AbstractOperatorDesc {
 
   protected boolean useVectorizedInputFileFormat;
 
+  protected long vectorizedVertexNum;
+
   protected boolean llapMode = false;
   protected boolean uberMode = false;
 
@@ -167,6 +169,14 @@ public abstract class BaseWork extends AbstractOperatorDesc {
     return returnSet;
   }
 
+  public void setVectorizedVertexNum(long vectorizedVertexNum) {
+    this.vectorizedVertexNum = vectorizedVertexNum;
+  }
+
+  public long getVectorizedVertexNum() {
+    return vectorizedVertexNum;
+  }
+
   // -----------------------------------------------------------------------------------------------
 
   /*

http://git-wip-us.apache.org/repos/asf/hive/blob/5d63605a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
index 72fc4ca..1910ebd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
@@ -103,37 +103,6 @@ public class ReduceWork extends BaseWork {
      return keyDesc;
   }
 
-  private ObjectInspector getObjectInspector(TableDesc desc) {
-    ObjectInspector objectInspector;
-    try {
-      Deserializer deserializer = ReflectionUtil.newInstance(desc
-                .getDeserializerClass(), null);
-      SerDeUtils.initializeSerDe(deserializer, null, desc.getProperties(), null);
-      objectInspector = deserializer.getObjectInspector();
-    } catch (Exception e) {
-      return null;
-    }
-    return objectInspector;
-  }
-
-  public ObjectInspector getKeyObjectInspector() {
-    if (keyObjectInspector == null) {
-      keyObjectInspector = getObjectInspector(keyDesc);
-    }
-    return keyObjectInspector;
-  }
-
-  // Only works when not tagging.
-  public ObjectInspector getValueObjectInspector() {
-    if (needsTagging) {
-      return null;
-    }
-    if (valueObjectInspector == null) {
-      valueObjectInspector = getObjectInspector(tagToValueDesc.get(tag));
-    }
-    return valueObjectInspector;
-  }
-
   public List<TableDesc> getTagToValueDesc() {
     return tagToValueDesc;
   }


Mime
View raw message