hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1527686 - in /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql: optimizer/ConvertJoinMapJoin.java optimizer/MapJoinProcessor.java parse/TezCompiler.java
Date Mon, 30 Sep 2013 17:57:21 GMT
Author: gunther
Date: Mon Sep 30 17:57:21 2013
New Revision: 1527686

URL: http://svn.apache.org/r1527686
Log:
HIVE-5271: Convert join op to a map join op in the planning phase (Vikram Dixit K via Gunther
Hagleitner)

Added:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1527686&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
(added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
Mon Sep 30 17:57:21 2013
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
+import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.Statistics;
+
+/**
+ * ConvertJoinMapJoin is an optimization that replaces a commone join
+ * (aka shuffle join) with a map join (aka broadcast or fragment replicate
+ * join when possible. Map joins have restrictions on which joins can be
+ * converted (e.g.: full outer joins cannot be handled as map joins) as well
+ * as memory restrictions (one side of the join has to fit into memory).
+ */
+public class ConvertJoinMapJoin implements NodeProcessor {
+
+  static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName());
+
+  @Override
+  /*
+   * (non-Javadoc)
+   * we should ideally not modify the tree we traverse.
+   * However, since we need to walk the tree at any time when we modify the
+   * operator, we might as well do it here.
+   */
+  public Object process(Node nd, Stack<Node> stack,
+      NodeProcessorCtx procCtx, Object... nodeOutputs)
+      throws SemanticException {
+
+    OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;
+
+    if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
+      return null;
+    }
+
+    JoinOperator joinOp = (JoinOperator) nd;
+
+    Set<Integer> bigTableCandidateSet = MapJoinProcessor.
+      getBigTableCandidates(joinOp.getConf().getConds());
+
+    long maxSize = context.conf.getLongVar(
+      HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
+
+    int bigTablePosition = -1;
+
+    Statistics bigInputStat = null;
+    long totalSize = 0;
+    int pos = 0;
+
+    // bigTableFound means we've encountered a table that's bigger than the
+    // max. This table is either the the big table or we cannot convert.
+    boolean bigTableFound = false;
+
+    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+
+      Statistics currInputStat = null;
+      try {
+        currInputStat = parentOp.getStatistics(context.conf);
+      } catch (HiveException e) {
+        return null;
+      }
+
+      long inputSize = currInputStat.getNumberOfBytes();
+      if ((bigInputStat == null) ||
+          ((bigInputStat != null) && 
+           (inputSize > bigInputStat.getNumberOfBytes()))) {
+
+        if (bigTableFound) {
+          // cannot convert to map join; we've already chosen a big table
+          // on size and there's another one that's bigger.
+          return null;
+        }
+
+        if (inputSize > maxSize) {
+          if (!bigTableCandidateSet.contains(pos)) {
+            // can't use the current table as the big table, but it's too
+            // big for the map side.
+            return null;
+          }
+
+          bigTableFound = true;
+        }
+
+        if (bigInputStat != null) {
+          // we're replacing the current big table with a new one. Need
+          // to count the current one as a map table then.
+          totalSize += bigInputStat.getNumberOfBytes();
+        }
+
+        if (totalSize > maxSize) {
+          // sum of small tables size in this join exceeds configured limit
+          // hence cannot convert.
+          return null;
+        }
+
+        if (bigTableCandidateSet.contains(pos)) {
+          bigTablePosition = pos;
+          bigInputStat = currInputStat;
+        }
+      } else {
+        totalSize += currInputStat.getNumberOfBytes();
+        if (totalSize > maxSize) {
+          // cannot hold all map tables in memory. Cannot convert.
+          return null;
+        }
+      }
+      pos++;
+    }
+
+    if (bigTablePosition == -1) {
+      // all tables have size 0. We let the suffle join handle this case.
+      return null;
+    }
+
+    /*
+     * Once we have decided on the map join, the tree would transform from
+     *
+     *        |                   |
+     *       Join               MapJoin
+     *       / \                /   \
+     *      RS RS   --->      RS    TS (big table)
+     *      /   \            /
+     *    TS     TS         TS (small table)
+     *
+     * for tez.
+     */
+
+    // convert to a map join operator with this information
+    ParseContext parseContext = context.parseContext;
+    MapJoinOperator mapJoinOp = MapJoinProcessor.
+      convertJoinOpMapJoinOp(parseContext.getOpParseCtx(),
+      joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true, false);
+
+    Operator<? extends OperatorDesc> parentBigTableOp
+      = mapJoinOp.getParentOperators().get(bigTablePosition);
+
+    if (parentBigTableOp instanceof ReduceSinkOperator) {
+      mapJoinOp.getParentOperators().remove(bigTablePosition);
+      if (!(mapJoinOp.getParentOperators().contains(
+          parentBigTableOp.getParentOperators().get(0)))) {
+        mapJoinOp.getParentOperators().add(bigTablePosition,
+          parentBigTableOp.getParentOperators().get(0));
+      }
+      parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp);
+      for (Operator<? extends OperatorDesc> op : mapJoinOp.getParentOperators()) {
+        if (!(op.getChildOperators().contains(mapJoinOp))) {
+          op.getChildOperators().add(mapJoinOp);
+        }
+        op.getChildOperators().remove(joinOp);
+      }
+    }
+
+    return null;
+  }
+}

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1527686&r1=1527685&r2=1527686&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
(original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
Mon Sep 30 17:57:21 2013
@@ -373,21 +373,90 @@ public class MapJoinProcessor implements
       pos++;
     }
 
-    // get the join keys from old parent ReduceSink operators
+    // create the map-join operator
+    MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(opParseCtxMap,
+        op, joinTree, mapJoinPos, noCheckOuterJoin, validateMapJoinTree);
+
+
+    // remove old parents
     for (pos = 0; pos < newParentOps.size(); pos++) {
-      ReduceSinkOperator oldPar = (ReduceSinkOperator) oldReduceSinkParentOps.get(pos);
-      ReduceSinkDesc rsconf = oldPar.getConf();
+      newParentOps.get(pos).removeChild(oldReduceSinkParentOps.get(pos));
+      newParentOps.get(pos).getChildOperators().add(mapJoinOp);
+    }
+
+
+    mapJoinOp.getParentOperators().removeAll(oldReduceSinkParentOps);
+    mapJoinOp.setParentOperators(newParentOps);
+
+
+    // change the children of the original join operator to point to the map
+    // join operator
+
+    return mapJoinOp;
+  }
+
+  public static MapJoinOperator convertJoinOpMapJoinOp(
+      LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
+      JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin,
+      boolean validateMapJoinTree)
+      throws SemanticException {
+
+    JoinDesc desc = op.getConf();
+    JoinCondDesc[] condns = desc.getConds();
+    Byte[] tagOrder = desc.getTagOrder();
+
+    // outer join cannot be performed on a table which is being cached
+    if (!noCheckOuterJoin) {
+      if (checkMapJoin(mapJoinPos, condns) < 0) {
+        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+      }
+    }
+
+    Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
+
+    // Walk over all the sources (which are guaranteed to be reduce sink
+    // operators).
+    // The join outputs a concatenation of all the inputs.
+    QBJoinTree leftSrc = joinTree.getJoinSrc();
+    List<Operator<? extends OperatorDesc>> oldReduceSinkParentOps =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+    if (leftSrc != null) {
+      // assert mapJoinPos == 0;
+      Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
+      assert parentOp.getParentOperators().size() == 1;
+      oldReduceSinkParentOps.add(parentOp);
+    }
+
+
+    byte pos = 0;
+    for (String src : joinTree.getBaseSrc()) {
+      if (src != null) {
+        Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
+        assert parentOp.getParentOperators().size() == 1;
+        Operator<? extends OperatorDesc> grandParentOp =
+          parentOp.getParentOperators().get(0);
+
+        oldReduceSinkParentOps.add(parentOp);
+      }
+      pos++;
+    }
+
+    // get the join keys from old parent ReduceSink operators
+    for (pos = 0; pos < op.getParentOperators().size(); pos++) {
+      ReduceSinkOperator parent = (ReduceSinkOperator) oldReduceSinkParentOps.get(pos);
+      ReduceSinkDesc rsconf = parent.getConf();
       List<ExprNodeDesc> keys = rsconf.getKeyCols();
       keyExprMap.put(pos, keys);
     }
 
-    // removing RS, only ExprNodeDesc is changed (key/value/filter exprs and colExprMap)
-    // others (output column-name, RR, schema) remain intact
-    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
-    List<String> outputColumnNames = op.getConf().getOutputColumnNames();
+    List<ExprNodeDesc> keyCols = keyExprMap.get(Byte.valueOf((byte) 0));
+    StringBuilder keyOrder = new StringBuilder();
+    for (int i = 0; i < keyCols.size(); i++) {
+      keyOrder.append("+");
+    }
 
+    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
     List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
-
     Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
     Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
     for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
@@ -411,45 +480,12 @@ public class MapJoinProcessor implements
       }
     }
 
-    Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
-    Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
-    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
-      byte srcTag = entry.getKey();
-      List<ExprNodeDesc> filter = entry.getValue();
-
-      Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
-      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
-    }
-    desc.setFilters(filters = newFilters);
-
-    // remove old parents
-    for (pos = 0; pos < newParentOps.size(); pos++) {
-      newParentOps.get(pos).removeChild(oldReduceSinkParentOps.get(pos));
-    }
-
-    JoinCondDesc[] joinCondns = op.getConf().getConds();
-
-    Operator[] newPar = new Operator[newParentOps.size()];
-    pos = 0;
-    for (Operator<? extends OperatorDesc> o : newParentOps) {
-      newPar[pos++] = o;
-    }
-
-    List<ExprNodeDesc> keyCols = keyExprMap.get(Byte.valueOf((byte) 0));
-    StringBuilder keyOrder = new StringBuilder();
-    for (int i = 0; i < keyCols.size(); i++) {
-      keyOrder.append("+");
-    }
-
-    TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils
-        .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
-
+    // construct valueTableDescs and valueFilteredTableDescs
     List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
     List<TableDesc> valueFiltedTableDescs = new ArrayList<TableDesc>();
-
     int[][] filterMap = desc.getFilterMap();
-    for (pos = 0; pos < newParentOps.size(); pos++) {
-      List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
+    for (pos = 0; pos < op.getParentOperators().size(); pos++) {
+      List<ExprNodeDesc> valueCols = newValueExprs.get(Byte.valueOf((byte) pos));
       int length = valueCols.size();
       List<ExprNodeDesc> valueFilteredCols = new ArrayList<ExprNodeDesc>(length);
       // deep copy expr node desc
@@ -476,6 +512,19 @@ public class MapJoinProcessor implements
       valueTableDescs.add(valueTableDesc);
       valueFiltedTableDescs.add(valueFilteredTableDesc);
     }
+
+    Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
+    Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
+    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
+      byte srcTag = entry.getKey();
+      List<ExprNodeDesc> filter = entry.getValue();
+
+      Operator<?> terminal = op.getParentOperators().get(srcTag);
+      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
+    }
+    desc.setFilters(filters = newFilters);
+
+    // create dumpfile prefix needed to create descriptor
     String dumpFilePrefix = "";
     if( joinTree.getMapAliases() != null ) {
       for(String mapAlias : joinTree.getMapAliases()) {
@@ -485,6 +534,11 @@ public class MapJoinProcessor implements
     } else {
       dumpFilePrefix = "mapfile"+PlanUtils.getCountForMapJoinDumpFilePrefix();
     }
+
+    List<String> outputColumnNames = op.getConf().getOutputColumnNames();
+    TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils
+        .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
+    JoinCondDesc[] joinCondns = op.getConf().getConds();
     MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs,
         valueTableDescs, valueFiltedTableDescs, outputColumnNames, mapJoinPos, joinCondns,
         filters, op.getConf().getNoOuterJoin(), dumpFilePrefix);
@@ -492,8 +546,11 @@ public class MapJoinProcessor implements
     mapJoinDescriptor.setNullSafes(desc.getNullSafes());
     mapJoinDescriptor.setFilterMap(desc.getFilterMap());
 
+    // reduce sink row resolver used to generate map join op
+    RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
+
     MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild(
-        mapJoinDescriptor, new RowSchema(outputRS.getColumnInfos()), newPar);
+        mapJoinDescriptor, new RowSchema(outputRS.getColumnInfos()), op.getParentOperators());
 
     OpParseContext ctx = new OpParseContext(outputRS);
     opParseCtxMap.put(mapJoinOp, ctx);
@@ -501,15 +558,12 @@ public class MapJoinProcessor implements
     mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs());
     mapJoinOp.setColumnExprMap(colExprMap);
 
-    // change the children of the original join operator to point to the map
-    // join operator
     List<Operator<? extends OperatorDesc>> childOps = op.getChildOperators();
     for (Operator<? extends OperatorDesc> childOp : childOps) {
       childOp.replaceParent(op, mapJoinOp);
     }
 
     mapJoinOp.setChildOperators(childOps);
-    mapJoinOp.setParentOperators(newParentOps);
     op.setChildOperators(null);
     op.setParentOperators(null);
 
@@ -519,6 +573,7 @@ public class MapJoinProcessor implements
     }
 
     return mapJoinOp;
+
   }
 
   /**

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1527686&r1=1527685&r2=1527686&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Mon Sep
30 17:57:21 2013
@@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
@@ -79,6 +81,8 @@ public class TezCompiler extends TaskCom
     opRules.put(new RuleRegExp(new String("Set parallelism - ReduceSink"),
         ReduceSinkOperator.getOperatorName() + "%"),
         new SetReducerParallelism());
+    opRules.put(new RuleRegExp(new String("Convert Join to Map-join"),
+        JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin());
 
     // if this is an explain statement add rule to generate statistics for
     // the whole tree.



Mime
View raw message