hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject hive git commit: HIVE-15279 : map join dummy operators are not set up correctly in certain cases with merge join (Sergey Shelukhin, reviewed by Gunther Hagleitner)
Date Wed, 07 Dec 2016 20:15:11 GMT
Repository: hive
Updated Branches:
  refs/heads/master 8804a7b89 -> d60802d6a


HIVE-15279 : map join dummy operators are not set up correctly in certain cases with merge
join (Sergey Shelukhin, reviewed by Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d60802d6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d60802d6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d60802d6

Branch: refs/heads/master
Commit: d60802d6a0b211dd005219cf2303fdd5e9eb6764
Parents: 8804a7b
Author: Sergey Shelukhin <sershe@apache.org>
Authored: Wed Dec 7 12:09:21 2016 -0800
Committer: Sergey Shelukhin <sershe@apache.org>
Committed: Wed Dec 7 12:09:21 2016 -0800

----------------------------------------------------------------------
 .../hive/ql/exec/HashTableDummyOperator.java      | 10 ++++++++++
 .../hive/ql/exec/tez/ReduceRecordProcessor.java   | 18 +++++++++++++++---
 .../hive/ql/optimizer/ReduceSinkMapJoinProc.java  |  3 ++-
 .../apache/hadoop/hive/ql/parse/GenTezUtils.java  |  1 +
 .../apache/hadoop/hive/ql/parse/GenTezWork.java   |  4 ++++
 5 files changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d60802d6/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
index 0aab7a8..2075d9b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
@@ -77,4 +77,14 @@ public class HashTableDummyOperator extends Operator<HashTableDummyDesc>
impleme
     return OperatorType.HASHTABLEDUMMY;
   }
 
+  @Override
+  public boolean equals(Object obj) {
+    return super.equals(obj) || (obj instanceof HashTableDummyOperator)
+        && (((HashTableDummyOperator)obj).operatorId == operatorId);
+  }
+  
+  @Override
+  public int hashCode() {
+    return operatorId.hashCode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d60802d6/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index cf3c8ab..e4c13fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -29,7 +30,6 @@ import java.util.concurrent.Callable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -41,7 +41,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
 import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -123,9 +122,23 @@ public class ReduceRecordProcessor  extends RecordProcessor{
 
     connectOps.clear();
     ReduceWork redWork = reduceWork;
+    l4j.info("Main work is " + reduceWork.getName());
+    List<HashTableDummyOperator> workOps = reduceWork.getDummyOps();
+    HashSet<HashTableDummyOperator> dummyOps = workOps == null ? null : new HashSet<>(workOps);
     tagToReducerMap.put(redWork.getTag(), redWork);
     if (mergeWorkList != null) {
       for (BaseWork mergeWork : mergeWorkList) {
+        if (l4j.isDebugEnabled()) {
+          l4j.debug("Additional work " + mergeWork.getName());
+        }
+        workOps = mergeWork.getDummyOps();
+        if (workOps != null) {
+          if (dummyOps == null) {
+            dummyOps = new HashSet<>(workOps);
+          } else {
+            dummyOps.addAll(workOps);
+          }
+        }
         ReduceWork mergeReduceWork = (ReduceWork) mergeWork;
         reducer = mergeReduceWork.getReducer();
         // Check immediately after reducer is assigned, in cae the abort came in during
@@ -193,7 +206,6 @@ public class ReduceRecordProcessor  extends RecordProcessor{
       // Initialization isn't finished until all parents of all operators
       // are initialized. For broadcast joins that means initializing the
       // dummy parent operators as well.
-      List<HashTableDummyOperator> dummyOps = redWork.getDummyOps();
       if (dummyOps != null) {
         for (HashTableDummyOperator dummyOp : dummyOps) {
           // TODO HIVE-14042. Propagating abort to dummyOps.

http://git-wip-us.apache.org/repos/asf/hive/blob/d60802d6/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
index 00afc18..3a6baca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
@@ -370,6 +370,7 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
     // at task startup
     if (mapJoinWork != null) {
       for (BaseWork myWork: mapJoinWork) {
+        LOG.debug("adding dummy op to work " + myWork.getName() + " from MJ work: " + dummyOp);
         myWork.addDummyOp(dummyOp);
       }
     }
@@ -382,4 +383,4 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
 
     return true;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d60802d6/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index fd80e6c..e2363eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -348,6 +348,7 @@ public class GenTezUtils {
         operators.addAll(current.getChildOperators());
       }
     }
+    LOG.debug("Setting dummy ops for work " + work.getName() + ": " + dummyOps);
     work.setDummyOps(dummyOps);
     work.replaceRoots(replacementMap);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/d60802d6/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index 461ba37..2b96e51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -269,7 +269,11 @@ public class GenTezWork implements NodeProcessor {
         if (context.linkOpWithWorkMap.containsKey(mj)) {
           Map<BaseWork,TezEdgeProperty> linkWorkMap = context.linkOpWithWorkMap.get(mj);
           if (linkWorkMap != null) {
+             // Note: it's not quite clear why this is done inside this if. Seems like it
should be on the top level.
             if (context.linkChildOpWithDummyOp.containsKey(mj)) {
+               if (LOG.isDebugEnabled()) {
+                LOG.debug("Adding dummy ops to work: " + work.getName() + ": " + context.linkChildOpWithDummyOp.get(mj));
+               }
               for (Operator<?> dummy: context.linkChildOpWithDummyOp.get(mj)) {
                 work.addDummyOp((HashTableDummyOperator) dummy);
               }


Mime
View raw message