hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r1447593 [2/8] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ data/files/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/phy...
Date Tue, 19 Feb 2013 05:17:54 GMT
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java Tue Feb 19 05:17:52 2013
@@ -17,33 +17,15 @@
  */
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -53,18 +35,8 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
-import org.apache.hadoop.hive.ql.parse.QB;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer;
-import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
  * this transformation does bucket map join optimization.
@@ -81,22 +53,13 @@ public class BucketMapJoinOptimizer impl
   public ParseContext transform(ParseContext pctx) throws SemanticException {
 
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    BucketMapjoinOptProcCtx bucketMapJoinOptimizeCtx =
-        new BucketMapjoinOptProcCtx(pctx.getConf());
+    BucketJoinProcCtx bucketMapJoinOptimizeCtx =
+        new BucketJoinProcCtx(pctx.getConf());
 
     // process map joins with no reducers pattern
     opRules.put(new RuleRegExp("R1",
         MapJoinOperator.getOperatorName() + "%"),
         getBucketMapjoinProc(pctx));
-    opRules.put(new RuleRegExp("R2",
-        ReduceSinkOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName()),
-        getBucketMapjoinRejectProc(pctx));
-    opRules.put(new RuleRegExp(new String("R3"),
-        UnionOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"),
-        getBucketMapjoinRejectProc(pctx));
-    opRules.put(new RuleRegExp(new String("R4"),
-        MapJoinOperator.getOperatorName() + "%.*" + MapJoinOperator.getOperatorName() + "%"),
-        getBucketMapjoinRejectProc(pctx));
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
@@ -105,29 +68,15 @@ public class BucketMapJoinOptimizer impl
     GraphWalker ogw = new DefaultGraphWalker(disp);
 
     // Create a list of topop nodes
-    ArrayList<Node> topNodes = new ArrayList<Node>();
+    List<Node> topNodes = new ArrayList<Node>();
     topNodes.addAll(pctx.getTopOps().values());
     ogw.startWalking(topNodes, null);
 
     return pctx;
   }
 
-  private NodeProcessor getBucketMapjoinRejectProc(ParseContext pctx) {
-    return new NodeProcessor() {
-      @Override
-      public Object process(Node nd, Stack<Node> stack,
-          NodeProcessorCtx procCtx, Object... nodeOutputs)
-          throws SemanticException {
-        MapJoinOperator mapJoinOp = (MapJoinOperator) nd;
-        BucketMapjoinOptProcCtx context = (BucketMapjoinOptProcCtx) procCtx;
-        context.listOfRejectedMapjoins.add(mapJoinOp);
-        return null;
-      }
-    };
-  }
-
   private NodeProcessor getBucketMapjoinProc(ParseContext pctx) {
-    return new BucketMapjoinOptProc(pctx);
+    return new BucketMapjoinProc(pctx);
   }
 
   private NodeProcessor getDefaultProc() {
@@ -140,394 +89,4 @@ public class BucketMapJoinOptimizer impl
       }
     };
   }
-
-  class BucketMapjoinOptProc extends AbstractBucketJoinProc implements NodeProcessor {
-
-    protected ParseContext pGraphContext;
-
-    public BucketMapjoinOptProc(ParseContext pGraphContext) {
-      super();
-      this.pGraphContext = pGraphContext;
-    }
-
-    private boolean convertBucketMapJoin(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-      MapJoinOperator mapJoinOp = (MapJoinOperator) nd;
-      BucketMapjoinOptProcCtx context = (BucketMapjoinOptProcCtx) procCtx;
-      HiveConf conf = context.getConf();
-
-      if (context.getListOfRejectedMapjoins().contains(mapJoinOp)) {
-        return false;
-      }
-
-      QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext().get(mapJoinOp);
-      if (joinCxt == null) {
-        return false;
-      }
-
-      List<String> joinAliases = new ArrayList<String>();
-      String[] srcs = joinCxt.getBaseSrc();
-      String[] left = joinCxt.getLeftAliases();
-      List<String> mapAlias = joinCxt.getMapAliases();
-      String baseBigAlias = null;
-
-      for (String s : left) {
-        if (s != null) {
-          String subQueryAlias = QB.getAppendedAliasFromId(joinCxt.getId(), s);
-          if (!joinAliases.contains(subQueryAlias)) {
-            joinAliases.add(subQueryAlias);
-            if(!mapAlias.contains(s)) {
-              baseBigAlias = subQueryAlias;
-            }
-          }
-        }
-      }
-
-      for (String s : srcs) {
-        if (s != null) {
-          String subQueryAlias = QB.getAppendedAliasFromId(joinCxt.getId(), s);
-          if (!joinAliases.contains(subQueryAlias)) {
-            joinAliases.add(subQueryAlias);
-            if(!mapAlias.contains(s)) {
-              baseBigAlias = subQueryAlias;
-            }
-          }
-        }
-      }
-
-      MapJoinDesc mjDesc = mapJoinOp.getConf();
-      LinkedHashMap<String, List<Integer>> aliasToPartitionBucketNumberMapping =
-          new LinkedHashMap<String, List<Integer>>();
-      LinkedHashMap<String, List<List<String>>> aliasToPartitionBucketFileNamesMapping =
-          new LinkedHashMap<String, List<List<String>>>();
-
-      Map<String, Operator<? extends OperatorDesc>> topOps =
-          this.pGraphContext.getTopOps();
-      Map<TableScanOperator, Table> topToTable = this.pGraphContext.getTopToTable();
-
-      // (partition to bucket file names) and (partition to bucket number) for
-      // the big table;
-      LinkedHashMap<Partition, List<String>> bigTblPartsToBucketFileNames = new LinkedHashMap<Partition, List<String>>();
-      LinkedHashMap<Partition, Integer> bigTblPartsToBucketNumber = new LinkedHashMap<Partition, Integer>();
-
-      Integer[] orders = null; // accessing order of join cols to bucket cols, should be same
-      boolean bigTablePartitioned = true;
-      for (int index = 0; index < joinAliases.size(); index++) {
-        String alias = joinAliases.get(index);
-        Operator<? extends OperatorDesc> topOp = joinCxt.getAliasToOpInfo().get(alias);
-        if (topOp == null) {
-          return false;
-        }
-        List<String> keys = toColumns(mjDesc.getKeys().get((byte) index));
-        if (keys == null || keys.isEmpty()) {
-          return false;
-        }
-        int oldKeySize = keys.size();
-        TableScanOperator tso = TableAccessAnalyzer.genRootTableScan(topOp, keys);
-        if (tso == null) {
-          return false;
-        }
-
-        // For nested sub-queries, the alias mapping is not maintained in QB currently.
-        if (topOps.containsValue(tso)) {
-          for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpEntry : topOps.entrySet()) {
-            if (topOpEntry.getValue() == tso) {
-              String newAlias = topOpEntry.getKey();
-              joinAliases.set(index, newAlias);
-              if (baseBigAlias.equals(alias)) {
-                baseBigAlias = newAlias;
-              }
-              alias = newAlias;
-              break;
-            }
-          }
-        }
-        else {
-          // Ideally, this should never happen, and this should be an assert.
-          return false;
-        }
-
-        // The join keys cannot be transformed in the sub-query currently.
-        // TableAccessAnalyzer.genRootTableScan will only return the base table scan
-        // if the join keys are constants or a column. Even a simple cast of the join keys
-        // will result in a null table scan operator. In case of constant join keys, they would
-        // be removed, and the size before and after the genRootTableScan will be different.
-        if (keys.size() != oldKeySize) {
-          return false;
-        }
-        if (orders == null) {
-          orders = new Integer[keys.size()];
-        }
-
-        Table tbl = topToTable.get(tso);
-        if (tbl.isPartitioned()) {
-          PrunedPartitionList prunedParts;
-          try {
-            prunedParts = pGraphContext.getOpToPartList().get(tso);
-            if (prunedParts == null) {
-              prunedParts = PartitionPruner.prune(tbl, pGraphContext.getOpToPartPruner().get(tso),
-                  pGraphContext.getConf(), alias,
-                  pGraphContext.getPrunedPartitions());
-              pGraphContext.getOpToPartList().put(tso, prunedParts);
-            }
-          } catch (HiveException e) {
-            // Has to use full name to make sure it does not conflict with
-            // org.apache.commons.lang.StringUtils
-            LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
-            throw new SemanticException(e.getMessage(), e);
-          }
-          List<Partition> partitions = prunedParts.getNotDeniedPartns();
-          // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number)
-          if (partitions.isEmpty()) {
-            if (!alias.equals(baseBigAlias)) {
-              aliasToPartitionBucketNumberMapping.put(alias, Arrays.<Integer> asList());
-              aliasToPartitionBucketFileNamesMapping.put(alias, new ArrayList<List<String>>());
-            }
-          } else {
-            List<Integer> buckets = new ArrayList<Integer>();
-            List<List<String>> files = new ArrayList<List<String>>();
-            for (Partition p : partitions) {
-              if (!checkBucketColumns(p.getBucketCols(), keys, orders)) {
-                return false;
-              }
-              List<String> fileNames = getOnePartitionBucketFileNames(p.getDataLocation());
-              // The number of files for the table should be same as number of buckets.
-              int bucketCount = p.getBucketCount();
-              if (fileNames.size() != bucketCount) {
-                String msg = "The number of buckets for table " +
-                    tbl.getTableName() + " partition " + p.getName() + " is " +
-                    p.getBucketCount() + ", whereas the number of files is " + fileNames.size();
-                throw new SemanticException(
-                    ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg));
-              }
-              if (alias.equals(baseBigAlias)) {
-                bigTblPartsToBucketFileNames.put(p, fileNames);
-                bigTblPartsToBucketNumber.put(p, bucketCount);
-              } else {
-                files.add(fileNames);
-                buckets.add(bucketCount);
-              }
-            }
-            if (!alias.equals(baseBigAlias)) {
-              aliasToPartitionBucketNumberMapping.put(alias, buckets);
-              aliasToPartitionBucketFileNamesMapping.put(alias, files);
-            }
-          }
-        } else {
-          if (!checkBucketColumns(tbl.getBucketCols(), keys, orders)) {
-            return false;
-          }
-          List<String> fileNames = getOnePartitionBucketFileNames(tbl.getDataLocation());
-          Integer num = new Integer(tbl.getNumBuckets());
-          // The number of files for the table should be same as number of buckets.
-          if (fileNames.size() != num) {
-            String msg = "The number of buckets for table " +
-                tbl.getTableName() + " is " + tbl.getNumBuckets() +
-                ", whereas the number of files is " + fileNames.size();
-            throw new SemanticException(
-                ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg));
-          }
-          if (alias.equals(baseBigAlias)) {
-            bigTblPartsToBucketFileNames.put(null, fileNames);
-            bigTblPartsToBucketNumber.put(null, tbl.getNumBuckets());
-            bigTablePartitioned = false;
-          } else {
-            aliasToPartitionBucketNumberMapping.put(alias, Arrays.asList(num));
-            aliasToPartitionBucketFileNamesMapping.put(alias, Arrays.asList(fileNames));
-          }
-        }
-      }
-
-      // All tables or partitions are bucketed, and their bucket number is
-      // stored in 'bucketNumbers', we need to check if the number of buckets in
-      // the big table can be divided by no of buckets in small tables.
-      for (Integer bucketNumber : bigTblPartsToBucketNumber.values()) {
-        if (!checkBucketNumberAgainstBigTable(aliasToPartitionBucketNumberMapping, bucketNumber)) {
-          return false;
-        }
-      }
-
-      MapJoinDesc desc = mapJoinOp.getConf();
-
-      Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
-          new LinkedHashMap<String, Map<String, List<String>>>();
-
-      // sort bucket names for the big table
-      for (List<String> partBucketNames : bigTblPartsToBucketFileNames.values()) {
-        Collections.sort(partBucketNames);
-      }
-
-      // go through all small tables and get the mapping from bucket file name
-      // in the big table to bucket file names in small tables.
-      for (int j = 0; j < joinAliases.size(); j++) {
-        String alias = joinAliases.get(j);
-        if (alias.equals(baseBigAlias)) {
-          continue;
-        }
-        for (List<String> names : aliasToPartitionBucketFileNamesMapping.get(alias)) {
-          Collections.sort(names);
-        }
-        List<Integer> smallTblBucketNums = aliasToPartitionBucketNumberMapping.get(alias);
-        List<List<String>> smallTblFilesList = aliasToPartitionBucketFileNamesMapping.get(alias);
-
-        Map<String, List<String>> mapping = new LinkedHashMap<String, List<String>>();
-        aliasBucketFileNameMapping.put(alias, mapping);
-
-        // for each bucket file in big table, get the corresponding bucket file
-        // name in the small table.
-        // more than 1 partition in the big table, do the mapping for each partition
-        Iterator<Entry<Partition, List<String>>> bigTblPartToBucketNames =
-            bigTblPartsToBucketFileNames.entrySet().iterator();
-        Iterator<Entry<Partition, Integer>> bigTblPartToBucketNum = bigTblPartsToBucketNumber
-            .entrySet().iterator();
-        while (bigTblPartToBucketNames.hasNext()) {
-          assert bigTblPartToBucketNum.hasNext();
-          int bigTblBucketNum = bigTblPartToBucketNum.next().getValue();
-          List<String> bigTblBucketNameList = bigTblPartToBucketNames.next().getValue();
-          fillMapping(smallTblBucketNums, smallTblFilesList,
-              mapping, bigTblBucketNum, bigTblBucketNameList, desc.getBigTableBucketNumMapping());
-        }
-      }
-      desc.setAliasBucketFileNameMapping(aliasBucketFileNameMapping);
-      desc.setBigTableAlias(baseBigAlias);
-      if (bigTablePartitioned) {
-        desc.setBigTablePartSpecToFileMapping(convert(bigTblPartsToBucketFileNames));
-      }
-      // successfully convert to bucket map join
-      desc.setBucketMapJoin(true);
-
-      return true;
-    }
-
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-
-      boolean convert = convertBucketMapJoin(nd, stack, procCtx, nodeOutputs);
-      BucketMapjoinOptProcCtx context = (BucketMapjoinOptProcCtx) procCtx;
-      HiveConf conf = context.getConf();
-
-      // Throw an error if the user asked for bucketed mapjoin to be enforced and
-      // bucketed mapjoin cannot be performed
-      if (!convert && conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETMAPJOIN)) {
-        throw new SemanticException(ErrorMsg.BUCKET_MAPJOIN_NOT_POSSIBLE.getMsg());
-      }
-
-      return null;
-    }
-
-    // convert partition to partition spec string
-    private Map<String, List<String>> convert(Map<Partition, List<String>> mapping) {
-      Map<String, List<String>> converted = new HashMap<String, List<String>>();
-      for (Map.Entry<Partition, List<String>> entry : mapping.entrySet()) {
-        converted.put(entry.getKey().getName(), entry.getValue());
-      }
-      return converted;
-    }
-
-    // called for each partition of big table and populates mapping for each file in the partition
-    private void fillMapping(
-        List<Integer> smallTblBucketNums,
-        List<List<String>> smallTblFilesList,
-        Map<String, List<String>> mapping,
-        int bigTblBucketNum, List<String> bigTblBucketNameList,
-        Map<String, Integer> bucketFileNameMapping) {
-
-      for (int bindex = 0; bindex < bigTblBucketNameList.size(); bindex++) {
-        ArrayList<String> resultFileNames = new ArrayList<String>();
-        for (int sindex = 0; sindex < smallTblBucketNums.size(); sindex++) {
-          int smallTblBucketNum = smallTblBucketNums.get(sindex);
-          List<String> smallTblFileNames = smallTblFilesList.get(sindex);
-          if (bigTblBucketNum >= smallTblBucketNum) {
-            // if the big table has more buckets than the current small table,
-            // use "MOD" to get small table bucket names. For example, if the big
-            // table has 4 buckets and the small table has 2 buckets, then the
-            // mapping should be 0->0, 1->1, 2->0, 3->1.
-            int toAddSmallIndex = bindex % smallTblBucketNum;
-            resultFileNames.add(smallTblFileNames.get(toAddSmallIndex));
-          } else {
-            int jump = smallTblBucketNum / bigTblBucketNum;
-            for (int i = bindex; i < smallTblFileNames.size(); i = i + jump) {
-              resultFileNames.add(smallTblFileNames.get(i));
-            }
-          }
-        }
-        String inputBigTBLBucket = bigTblBucketNameList.get(bindex);
-        mapping.put(inputBigTBLBucket, resultFileNames);
-        bucketFileNameMapping.put(inputBigTBLBucket, bindex);
-      }
-    }
-
-    private boolean checkBucketNumberAgainstBigTable(
-        Map<String, List<Integer>> aliasToBucketNumber, int bucketNumberInPart) {
-      for (List<Integer> bucketNums : aliasToBucketNumber.values()) {
-        for (int nxt : bucketNums) {
-          boolean ok = (nxt >= bucketNumberInPart) ? nxt % bucketNumberInPart == 0
-              : bucketNumberInPart % nxt == 0;
-          if (!ok) {
-            return false;
-          }
-        }
-      }
-      return true;
-    }
-
-    private List<String> getOnePartitionBucketFileNames(URI location)
-        throws SemanticException {
-      List<String> fileNames = new ArrayList<String>();
-      try {
-        FileSystem fs = FileSystem.get(location, this.pGraphContext.getConf());
-        FileStatus[] files = fs.listStatus(new Path(location.toString()));
-        if (files != null) {
-          for (FileStatus file : files) {
-            fileNames.add(file.getPath().toString());
-          }
-        }
-      } catch (IOException e) {
-        throw new SemanticException(e);
-      }
-      return fileNames;
-    }
-
-    private boolean checkBucketColumns(List<String> bucketColumns, List<String> keys,
-        Integer[] orders) {
-      if (keys == null || bucketColumns == null || bucketColumns.isEmpty()) {
-        return false;
-      }
-      for (int i = 0; i < keys.size(); i++) {
-        int index = bucketColumns.indexOf(keys.get(i));
-        if (orders[i] != null && orders[i] != index) {
-          return false;
-        }
-        orders[i] = index;
-      }
-      // Check if the join columns contains all bucket columns.
-      // If a table is bucketized on column B, but the join key is A and B,
-      // it is easy to see joining on different buckets yield empty results.
-      return keys.containsAll(bucketColumns);
-    }
-  }
-
-  class BucketMapjoinOptProcCtx implements NodeProcessorCtx {
-    private final HiveConf conf;
-
-    // we only convert map joins that follows a root table scan in the same
-    // mapper. That means there is no reducer between the root table scan and
-    // mapjoin.
-    Set<MapJoinOperator> listOfRejectedMapjoins = new HashSet<MapJoinOperator>();
-
-    public BucketMapjoinOptProcCtx(HiveConf conf) {
-      this.conf = conf;
-    }
-
-    public HiveConf getConf() {
-      return conf;
-    }
-
-    public Set<MapJoinOperator> getListOfRejectedMapjoins() {
-      return listOfRejectedMapjoins;
-    }
-  }
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java?rev=1447593&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java Tue Feb 19 05:17:52 2013
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.Stack;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+public class BucketMapjoinProc extends AbstractBucketJoinProc implements NodeProcessor {
+  public BucketMapjoinProc(ParseContext pGraphContext) {
+    super(pGraphContext);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+      Object... nodeOutputs) throws SemanticException {
+    BucketJoinProcCtx context = (BucketJoinProcCtx) procCtx;
+    MapJoinOperator mapJoinOperator = (MapJoinOperator) nd;
+
+    // can the mapjoin present be converted to a bucketed mapjoin
+    boolean convert = canConvertMapJoinToBucketMapJoin(
+        mapJoinOperator, pGraphContext, context);
+    HiveConf conf = context.getConf();
+
+    // Throw an error if the user asked for bucketed mapjoin to be enforced and
+    // bucketed mapjoin cannot be performed
+    if (!convert && conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETMAPJOIN)) {
+      throw new SemanticException(ErrorMsg.BUCKET_MAPJOIN_NOT_POSSIBLE.getMsg());
+    }
+
+    if (convert) {
+      // convert the mapjoin to a bucketized mapjoin
+      convertMapJoinToBucketMapJoin(mapJoinOperator, context);
+    }
+
+    return null;
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LeftmostBigTableSelectorForAutoSMJ.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LeftmostBigTableSelectorForAutoSMJ.java?rev=1447593&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LeftmostBigTableSelectorForAutoSMJ.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LeftmostBigTableSelectorForAutoSMJ.java Tue Feb 19 05:17:52 2013
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+
+/*
+ * This is a pluggable policy to chose the candidate map-join table for converting a join to a
+ * sort merge join. The leftmost table is chosen as the join table.
+ */
+public class LeftmostBigTableSelectorForAutoSMJ implements BigTableSelectorForAutoSMJ {
+  public int getBigTablePosition(ParseContext parseContext, JoinOperator joinOp) {
+    return 0;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Tue Feb 19 05:17:52 2013
@@ -74,12 +74,22 @@ public class Optimizer {
     }
     transformations.add(new SamplePruner());
     transformations.add(new MapJoinProcessor());
+    boolean bucketMapJoinOptimizer = false;
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) {
       transformations.add(new BucketMapJoinOptimizer());
-      if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
-        transformations.add(new SortedMergeBucketMapJoinOptimizer());
+      bucketMapJoinOptimizer = true;
+    }
+
+    // If optimize hive.optimize.bucketmapjoin.sortedmerge is set, add both
+    // BucketMapJoinOptimizer and SortedMergeBucketMapJoinOptimizer
+    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
+      if (!bucketMapJoinOptimizer) {
+        // No need to add BucketMapJoinOptimizer twice
+        transformations.add(new BucketMapJoinOptimizer());
       }
+      transformations.add(new SortedMergeBucketMapJoinOptimizer());
     }
+
     transformations.add(new UnionProcessor());
     transformations.add(new JoinReorder());
     if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) {

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java?rev=1447593&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java Tue Feb 19 05:17:52 2013
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+/*
+ * This is a pluggable policy to chose the candidate map-join table for converting a join to a
+ * sort merge join. The largest table is chosen based on the size of the tables.
+ */
+public abstract class SizeBasedBigTableSelectorForAutoSMJ {
+  protected void getListTopOps(
+    Operator<? extends OperatorDesc> op, List<TableScanOperator> topOps) {
+    if ((op.getParentOperators() == null) ||
+        (op.getParentOperators().isEmpty())) {
+      return;
+    }
+
+    for (Operator<? extends OperatorDesc> parentOp : op.getParentOperators()) {
+      if (parentOp instanceof TableScanOperator) {
+        topOps.add((TableScanOperator)parentOp);
+      }
+      else {
+        getListTopOps(parentOp, topOps);
+      }
+    }
+  }
+
+  private long getSize(HiveConf conf, String size, Path path) {
+    // If the size is present in the metastore, use it
+    if (size != null) {
+      try {
+        return Long.valueOf(size);
+      } catch (NumberFormatException e) {
+        return -1;
+      }
+    }
+
+    try {
+      FileSystem fs = path.getFileSystem(conf);
+      return fs.getContentSummary(path).getLength();
+    } catch (Exception e) {
+      return -1;
+    }
+  }
+
+  protected long getSize(HiveConf conf, Table table) {
+    Path path = table.getPath();
+    String size = table.getProperty("totalSize");
+    return getSize(conf, size, path);
+  }
+
+  protected long getSize(HiveConf conf, Partition partition) {
+    Path path = partition.getPartitionPath();
+    String size = partition.getParameters().get("totalSize");
+
+    return getSize(conf, size, path);
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortBucketJoinProcCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortBucketJoinProcCtx.java?rev=1447593&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortBucketJoinProcCtx.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortBucketJoinProcCtx.java Tue Feb 19 05:17:52 2013
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+
+
+public class SortBucketJoinProcCtx extends BucketJoinProcCtx {
+  private String[] srcs;
+  private int bigTablePosition;
+  private Map<Byte, List<ExprNodeDesc>> keyExprMap;
+
+  public SortBucketJoinProcCtx(HiveConf conf) {
+    super(conf);
+  }
+
+  public String[] getSrcs() {
+    return srcs;
+  }
+
+  public void setSrcs(String[] srcs) {
+    this.srcs = srcs;
+  }
+
+  public int getBigTablePosition() {
+    return bigTablePosition;
+  }
+
+  public void setBigTablePosition(int bigTablePosition) {
+    this.bigTablePosition = bigTablePosition;
+  }
+
+  public Map<Byte, List<ExprNodeDesc>> getKeyExprMap() {
+    return keyExprMap;
+  }
+
+  public void setKeyExprMap(Map<Byte, List<ExprNodeDesc>> keyExprMap) {
+    this.keyExprMap = keyExprMap;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java Tue Feb 19 05:17:52 2013
@@ -19,22 +19,17 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Order;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -44,18 +39,9 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
-import org.apache.hadoop.hive.ql.parse.QB;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 
 //try to replace a bucket map join with a sorted merge map join
 public class SortedMergeBucketMapJoinOptimizer implements Transform {
@@ -66,8 +52,38 @@ public class SortedMergeBucketMapJoinOpt
   public SortedMergeBucketMapJoinOptimizer() {
   }
 
+  private void getListOfRejectedJoins(
+    ParseContext pctx, SortBucketJoinProcCtx smbJoinContext)
+    throws SemanticException {
+
+    // Go through all joins - it should only contain selects and filters between
+    // tablescan and join operators.
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(new RuleRegExp("R1", JoinOperator.getOperatorName() + "%"),
+      getCheckCandidateJoin());
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, smbJoinContext);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+
+    // Create a list of topop nodes
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+  }
+
   @Override
   public ParseContext transform(ParseContext pctx) throws SemanticException {
+    HiveConf conf = pctx.getConf();
+    SortBucketJoinProcCtx smbJoinContext =
+      new SortBucketJoinProcCtx(conf);
+
+    // Get a list of joins which cannot be converted to a sort merge join
+    // Only selects and filters operators are allowed between the table scan and
+    // join currently. More operators can be added - the method supportAutomaticSortMergeJoin
+    // dictates which operator is allowed
+    getListOfRejectedJoins(pctx, smbJoinContext);
 
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
     // go through all map joins and find out all which have enabled bucket map
@@ -76,7 +92,15 @@ public class SortedMergeBucketMapJoinOpt
         getSortedMergeBucketMapjoinProc(pctx));
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
-    Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, null);
+
+    // There is no need for the user to specify mapjoin for it to be
+    // converted to sort-merge join
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN)) {
+      opRules.put(new RuleRegExp("R2", "JOIN%"),
+        getSortedMergeJoinProc(pctx));
+    }
+
+    Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, smbJoinContext);
     GraphWalker ogw = new DefaultGraphWalker(disp);
 
     // Create a list of topop nodes
@@ -91,6 +115,10 @@ public class SortedMergeBucketMapJoinOpt
     return new SortedMergeBucketMapjoinProc(pctx);
   }
 
+  private NodeProcessor getSortedMergeJoinProc(ParseContext pctx) {
+    return new SortedMergeJoinProc(pctx);
+  }
+
   private NodeProcessor getDefaultProc() {
     return new NodeProcessor() {
       @Override
@@ -102,318 +130,34 @@ public class SortedMergeBucketMapJoinOpt
     };
   }
 
-  class SortedMergeBucketMapjoinProc extends AbstractBucketJoinProc implements NodeProcessor {
-    private ParseContext pGraphContext;
-
-    public SortedMergeBucketMapjoinProc(ParseContext pctx) {
-      this.pGraphContext = pctx;
-    }
-
-    public SortedMergeBucketMapjoinProc() {
-    }
-
-    // Return true or false based on whether the mapjoin was converted successfully to
-    // a sort-merge map join operator.
-    private boolean convertSMBJoin(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-      if (nd instanceof SMBMapJoinOperator) {
-        return false;
-      }
-      MapJoinOperator mapJoinOp = (MapJoinOperator) nd;
-      if (mapJoinOp.getConf().getAliasBucketFileNameMapping() == null
-          || mapJoinOp.getConf().getAliasBucketFileNameMapping().size() == 0) {
-        return false;
-      }
-
-      boolean tableSorted = true;
-      QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext()
-          .get(mapJoinOp);
-      if (joinCxt == null) {
-        return false;
-      }
-      String[] srcs = joinCxt.getBaseSrc();
-      for (int srcPos = 0; srcPos < srcs.length; srcPos++) {
-        srcs[srcPos] = QB.getAppendedAliasFromId(joinCxt.getId(), srcs[srcPos]);
-      }
-
-      // All the tables/partitions columns should be sorted in the same order
-      // For example, if tables A and B are being joined on columns c1, c2 and c3
-      // which are the sorted and bucketed columns. The join would work, as long
-      // c1, c2 and c3 are sorted in the same order.
-      List<Order> sortColumnsFirstTable = new ArrayList<Order>();
-
-      for (int pos = 0; pos < srcs.length; pos++) {
-        tableSorted = tableSorted
-            && isTableSorted(this.pGraphContext,
-                             mapJoinOp,
-                             joinCxt,
-                             pos,
-                             sortColumnsFirstTable,
-                             srcs);
-      }
-      if (!tableSorted) {
-        //this is a mapjoin but not suit for a sort merge bucket map join. check outer joins
-        MapJoinProcessor.checkMapJoin(((MapJoinOperator) nd).getConf().getPosBigTable(),
-            ((MapJoinOperator) nd).getConf().getConds());
-        return false;
-      }
-      // convert a bucket map join operator to a sorted merge bucket map join
-      // operator
-      convertToSMBJoin(mapJoinOp, srcs);
-      return true;
-    }
-
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+  // check if the join operator encountered is a candidate for being converted
+  // to a sort-merge join
+  private NodeProcessor getCheckCandidateJoin() {
+    return new NodeProcessor() {
+      @Override
+      public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      boolean convert = convertSMBJoin(nd, stack, procCtx, nodeOutputs);
-      // Throw an error if the user asked for sort merge bucketed mapjoin to be enforced
-      // and sort merge bucketed mapjoin cannot be performed
-      if (!convert &&
-        pGraphContext.getConf().getBoolVar(
-          HiveConf.ConfVars.HIVEENFORCESORTMERGEBUCKETMAPJOIN)) {
-        throw new SemanticException(ErrorMsg.SORTMERGE_MAPJOIN_FAILED.getMsg());
-      }
-
-      return null;
-    }
-
-    private SMBMapJoinOperator convertToSMBJoin(MapJoinOperator mapJoinOp,
-        String[] srcs) {
-      SMBMapJoinOperator smbJop = new SMBMapJoinOperator(mapJoinOp);
-      SMBJoinDesc smbJoinDesc = new SMBJoinDesc(mapJoinOp.getConf());
-      smbJop.setConf(smbJoinDesc);
-      HashMap<Byte, String> tagToAlias = new HashMap<Byte, String>();
-      for (int i = 0; i < srcs.length; i++) {
-        tagToAlias.put((byte) i, srcs[i]);
-      }
-      smbJoinDesc.setTagToAlias(tagToAlias);
-
-      int indexInListMapJoinNoReducer = this.pGraphContext.getListMapJoinOpsNoReducer().indexOf(mapJoinOp);
-      if(indexInListMapJoinNoReducer >= 0 ) {
-        this.pGraphContext.getListMapJoinOpsNoReducer().remove(indexInListMapJoinNoReducer);
-        this.pGraphContext.getListMapJoinOpsNoReducer().add(indexInListMapJoinNoReducer, smbJop);
-      }
-
-      Map<String, DummyStoreOperator> aliasToSink =
-          new HashMap<String, DummyStoreOperator>();
-      // For all parents (other than the big table), insert a dummy store operator
-      /* Consider a query like:
-        *
-        * select * from
-        *   (subq1 --> has a filter)
-        *   join
-        *   (subq2 --> has a filter)
-        * on some key
-        *
-        * Let us assume that subq1 is the small table (either specified by the user or inferred
-        * automatically). The following operator tree will be created:
-        *
-        * TableScan (subq1) --> Select --> Filter --> DummyStore
-        *                                                         \
-        *                                                          \     SMBJoin
-        *                                                          /
-        *                                                         /
-        * TableScan (subq2) --> Select --> Filter
-        */
-      List<? extends Operator> parentOperators = mapJoinOp.getParentOperators();
-      for (int i = 0; i < parentOperators.size(); i++) {
-        Operator par = parentOperators.get(i);
-        int index = par.getChildOperators().indexOf(mapJoinOp);
-        par.getChildOperators().remove(index);
-        if (i == smbJoinDesc.getPosBigTable()) {
-          par.getChildOperators().add(index, smbJop);
-        }
-        else {
-          DummyStoreOperator dummyStoreOp = new DummyStoreOperator();
-          par.getChildOperators().add(index, dummyStoreOp);
-
-          List<Operator<? extends OperatorDesc>> childrenOps =
-              new ArrayList<Operator<? extends OperatorDesc>>();
-          childrenOps.add(smbJop);
-          dummyStoreOp.setChildOperators(childrenOps);
-
-          List<Operator<? extends OperatorDesc>> parentOps =
-              new ArrayList<Operator<? extends OperatorDesc>>();
-          parentOps.add(par);
-          dummyStoreOp.setParentOperators(parentOps);
-
-          aliasToSink.put(srcs[i], dummyStoreOp);
-          smbJop.getParentOperators().remove(i);
-          smbJop.getParentOperators().add(i, dummyStoreOp);
-        }
-      }
-      smbJoinDesc.setAliasToSink(aliasToSink);
-      List<? extends Operator> childOps = mapJoinOp.getChildOperators();
-      for (int i = 0; i < childOps.size(); i++) {
-        Operator child = childOps.get(i);
-        int index = child.getParentOperators().indexOf(mapJoinOp);
-        child.getParentOperators().remove(index);
-        child.getParentOperators().add(index, smbJop);
-      }
-      return smbJop;
-    }
-
-    /**
-     * Whether this table is eligible for a sort-merge join.
-     *
-     * @param pctx                  parse context
-     * @param op                    map join operator being considered
-     * @param joinTree              join tree being considered
-     * @param alias                 table alias in the join tree being checked
-     * @param pos                   position of the table
-     * @param sortColumnsFirstTable The names and order of the sorted columns for the first table.
-     *                              It is not initialized when pos = 0.
-     * @return
-     * @throws SemanticException
-     */
-    private boolean isTableSorted(ParseContext pctx,
-      MapJoinOperator op,
-      QBJoinTree joinTree,
-      int pos,
-      List<Order> sortColumnsFirstTable,
-      String[] aliases)
-      throws SemanticException {
-      String alias = aliases[pos];
-      Map<TableScanOperator, Table> topToTable = this.pGraphContext
-          .getTopToTable();
-
-      /*
-       * Consider a query like:
-       *
-       * select -- mapjoin(subq1) --  * from
-       * (select a.key, a.value from tbl1 a) subq1
-       *   join
-       * (select a.key, a.value from tbl2 a) subq2
-       * on subq1.key = subq2.key;
-       *
-       * aliasToOpInfo contains the SelectOperator for subq1 and subq2.
-       * We need to traverse the tree (using TableAccessAnalyzer) to get to the base
-       * table. If the object being map-joined is a base table, then aliasToOpInfo
-       * contains the TableScanOperator, and TableAccessAnalyzer is a no-op.
-       */
-      Operator<? extends OperatorDesc> topOp = joinTree.getAliasToOpInfo().get(alias);
-      if (topOp == null) {
-        return false;
-      }
-      List<String> joinCols = toColumns(op.getConf().getKeys().get((byte) pos));
-      if (joinCols == null || joinCols.isEmpty()) {
-        return false;
-      }
-      TableScanOperator tso = TableAccessAnalyzer.genRootTableScan(topOp, joinCols);
-      if (tso == null) {
-        return false;
-      }
-
-      // For nested sub-queries, the alias mapping is not maintained in QB currently.
-      /*
-       * Consider a query like:
-       *
-       * select count(*) from
-       *   (
-       *     select key, count(*) from
-       *       (
-       *         select --mapjoin(a)-- a.key as key, a.value as val1, b.value as val2
-       *         from tbl1 a join tbl2 b on a.key = b.key
-       *       ) subq1
-       *     group by key
-       *   ) subq2;
-       *
-       * The table alias should be subq2:subq1:a which needs to be fetched from topOps.
-       */
-      if (pGraphContext.getTopOps().containsValue(tso)) {
-        for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpEntry :
-          this.pGraphContext.getTopOps().entrySet()) {
-          if (topOpEntry.getValue() == tso) {
-            alias = topOpEntry.getKey();
-            aliases[pos] = alias;
-            break;
+        SortBucketJoinProcCtx smbJoinContext = (SortBucketJoinProcCtx)procCtx;
+        JoinOperator joinOperator = (JoinOperator)nd;
+        int size = stack.size();
+        if (!(stack.get(size-1) instanceof JoinOperator) ||
+            !(stack.get(size-2) instanceof ReduceSinkOperator)) {
+          smbJoinContext.getRejectedJoinOps().add(joinOperator);
+          return null;
+        }
+
+        // If any operator in the stack does not support a auto-conversion, this join should
+        // not be converted.
+        for (int pos = size -3; pos >= 0; pos--) {
+          Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>)stack.get(pos);
+          if (!op.supportAutomaticSortMergeJoin()) {
+            smbJoinContext.getRejectedJoinOps().add(joinOperator);
+            return null;
           }
         }
-      }
-      else {
-        // Ideally, this should never happen, and this should be an assert.
-        return false;
-      }
-
-      Table tbl = topToTable.get(tso);
 
-      if (tbl.isPartitioned()) {
-        PrunedPartitionList prunedParts = null;
-        try {
-          prunedParts = pGraphContext.getOpToPartList().get(tso);
-          if (prunedParts == null) {
-            prunedParts = PartitionPruner.prune(tbl, pGraphContext
-                .getOpToPartPruner().get(tso), pGraphContext.getConf(), alias,
-                pGraphContext.getPrunedPartitions());
-            pGraphContext.getOpToPartList().put(tso, prunedParts);
-          }
-        } catch (HiveException e) {
-          LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
-          throw new SemanticException(e.getMessage(), e);
-        }
-        List<Partition> partitions = prunedParts.getNotDeniedPartns();
-        // Populate the names and order of columns for the first partition of the
-        // first table
-        if ((pos == 0) && (partitions != null) && (!partitions.isEmpty())) {
-          Partition firstPartition = partitions.get(0);
-          sortColumnsFirstTable.addAll(firstPartition.getSortCols());
-        }
-
-        for (Partition partition : prunedParts.getNotDeniedPartns()) {
-          if (!checkSortColsAndJoinCols(partition.getSortCols(),
-                                        joinCols,
-                                        sortColumnsFirstTable)) {
-            return false;
-          }
-        }
-        return true;
-      }
-
-      // Populate the names and order of columns for the first table
-      if (pos == 0) {
-        sortColumnsFirstTable.addAll(tbl.getSortCols());
-      }
-
-      return checkSortColsAndJoinCols(tbl.getSortCols(),
-        joinCols,
-        sortColumnsFirstTable);
-    }
-
-    private boolean checkSortColsAndJoinCols(List<Order> sortCols,
-        List<String> joinCols,
-        List<Order> sortColumnsFirstPartition) {
-
-      if (sortCols == null || sortCols.size() < joinCols.size()) {
-        return false;
-      }
-
-      // A join is eligible for a sort-merge join, only if it is eligible for
-      // a bucketized map join. So, we dont need to check for bucketized map
-      // join here. We are guaranteed that the join keys contain all the
-      // bucketized keys (note that the order need not be the same).
-      List<String> sortColNames = new ArrayList<String>();
-
-      // The join columns should contain all the sort columns
-      // The sort columns of all the tables should be in the same order
-      // compare the column names and the order with the first table/partition.
-      for (int pos = 0; pos < sortCols.size(); pos++) {
-        Order o = sortCols.get(pos);
-        if (o.getOrder() != sortColumnsFirstPartition.get(pos).getOrder()) {
-          return false;
-        }
-        sortColNames.add(o.getCol());
+        return null;
       }
-
-      // The column names and order (ascending/descending) matched
-      // The first 'n' sorted columns should be the same as the joinCols, where
-      // 'n' is the size of join columns.
-      // For eg: if the table is sorted by (a,b,c), it is OK to convert if the join is
-      // on (a), (a,b), or any combination of (a,b,c):
-      //   (a,b,c), (a,c,b), (c,a,b), (c,b,a), (b,c,a), (b,a,c)
-      // but it is not OK to convert if the join is on (a,c)
-      return sortColNames.subList(0, joinCols.size()).containsAll(joinCols);
-    }
+    };
   }
-
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java?rev=1447593&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java Tue Feb 19 05:17:52 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.Stack;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+public class SortedMergeBucketMapjoinProc extends AbstractSMBJoinProc implements NodeProcessor {
+  public SortedMergeBucketMapjoinProc(ParseContext pctx) {
+    super(pctx);
+  }
+
+  public SortedMergeBucketMapjoinProc() {
+  }
+
+  @Override
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+      Object... nodeOutputs) throws SemanticException {
+    if (nd instanceof SMBMapJoinOperator) {
+      return null;
+    }
+
+    MapJoinOperator mapJoinOp = (MapJoinOperator) nd;
+    SortBucketJoinProcCtx smbJoinContext = (SortBucketJoinProcCtx) procCtx;
+
+    boolean convert =
+        canConvertBucketMapJoinToSMBJoin(mapJoinOp, stack, smbJoinContext, nodeOutputs);
+
+    // Throw an error if the user asked for sort merge bucketed mapjoin to be enforced
+    // and sort merge bucketed mapjoin cannot be performed
+    if (!convert &&
+        pGraphContext.getConf().getBoolVar(
+            HiveConf.ConfVars.HIVEENFORCESORTMERGEBUCKETMAPJOIN)) {
+      throw new SemanticException(ErrorMsg.SORTMERGE_MAPJOIN_FAILED.getMsg());
+    }
+
+    if (convert) {
+      convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, pGraphContext);
+    }
+    return null;
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java?rev=1447593&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java Tue Feb 19 05:17:52 2013
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+public class SortedMergeJoinProc extends AbstractSMBJoinProc implements NodeProcessor {
+
+  public SortedMergeJoinProc(ParseContext pctx) {
+    super(pctx);
+  }
+
+  public SortedMergeJoinProc() {
+  }
+
+  @Override
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+      Object... nodeOutputs) throws SemanticException {
+
+    JoinOperator joinOp = (JoinOperator) nd;
+    SortBucketJoinProcCtx smbJoinContext = (SortBucketJoinProcCtx) procCtx;
+
+    boolean convert =
+        canConvertJoinToSMBJoin(
+            joinOp, smbJoinContext, pGraphContext);
+
+    if (convert) {
+      convertJoinToSMBJoin(joinOp, smbJoinContext, pGraphContext);
+    }
+    return null;
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java?rev=1447593&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java Tue Feb 19 05:17:52 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/*
+ * This is a pluggable policy to chose the candidate map-join table for converting a join to a
+ * sort merge join. The largest table is chosen based on the size of the tables.
+ */
+public class TableSizeBasedBigTableSelectorForAutoSMJ extends SizeBasedBigTableSelectorForAutoSMJ
+implements BigTableSelectorForAutoSMJ {
+  public int getBigTablePosition(ParseContext parseCtx, JoinOperator joinOp)
+    throws SemanticException {
+    int bigTablePos = 0;
+    long maxSize = 0;
+    HiveConf conf = parseCtx.getConf();
+
+    try {
+      List<TableScanOperator> topOps = new ArrayList<TableScanOperator>();
+      getListTopOps(joinOp, topOps);
+      int currentPos = 0;
+      for (TableScanOperator topOp : topOps) {
+        Table table = parseCtx.getTopToTable().get(topOp);
+        long currentSize = 0;
+
+        if (!table.isPartitioned()) {
+          currentSize = getSize(conf, table);
+        }
+        else {
+          // For partitioned tables, get the size of all the partitions
+          PrunedPartitionList partsList =
+            PartitionPruner.prune(parseCtx.getTopToTable().get(topOp),
+              parseCtx.getOpToPartPruner().get(topOp), parseCtx.getConf(),
+              null, parseCtx.getPrunedPartitions());
+          for (Partition part : partsList.getNotDeniedPartns()) {
+            currentSize += getSize(conf, part);
+          }
+        }
+
+        if (currentSize > maxSize) {
+          maxSize = currentSize;
+          bigTablePos = currentPos;
+        }
+        currentPos++;
+      }
+    } catch (HiveException e) {
+      throw new SemanticException(e.getMessage());
+    }
+
+    return bigTablePos;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java Tue Feb 19 05:17:52 2013
@@ -50,8 +50,7 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin;
-import
-  org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
@@ -554,12 +553,42 @@ public class CommonJoinResolver implemen
       return null;
     }
 
+    /*
+     * If any operator which does not allow map-side conversion is present in the mapper, dont
+     * convert it into a conditional task.
+     */
+    private boolean checkOperatorOKMapJoinConversion(Operator<? extends OperatorDesc> op) {
+      if (!op.opAllowedConvertMapJoin()) {
+        return false;
+      }
+
+      if (op.getChildOperators() == null) {
+        return true;
+      }
+
+      for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
+        if (!checkOperatorOKMapJoinConversion(childOp)) {
+          return false;
+        }
+      }
+
+      return true;
+    }
+
     private JoinOperator getJoinOp(MapRedTask task) throws SemanticException {
-      if (task.getWork() == null) {
+      MapredWork work = task.getWork();
+      if (work == null) {
         return null;
       }
-      Operator<? extends OperatorDesc> reducerOp = task.getWork().getReducer();
+      Operator<? extends OperatorDesc> reducerOp = work.getReducer();
       if (reducerOp instanceof JoinOperator) {
+        /* Is any operator present, which prevents the conversion */
+        Map<String, Operator<? extends OperatorDesc>> aliasToWork = work.getAliasToWork();
+        for (Operator<? extends OperatorDesc> op : aliasToWork.values()) {
+          if (!checkOperatorOKMapJoinConversion(op)) {
+            return null;
+          }
+        }
         return (JoinOperator) reducerOp;
       } else {
         return null;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java Tue Feb 19 05:17:52 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Gr
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo;
@@ -71,6 +72,7 @@ public class ParseContext {
   private LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx;
   private Map<JoinOperator, QBJoinTree> joinContext;
   private Map<MapJoinOperator, QBJoinTree> mapJoinContext;
+  private Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext;
   private HashMap<TableScanOperator, Table> topToTable;
   private HashMap<String, SplitSample> nameToSplitSample;
   private List<LoadTableDesc> loadTableWork;
@@ -160,6 +162,7 @@ public class ParseContext {
       HashMap<String, Operator<? extends OperatorDesc>> topSelOps,
       LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtx,
       Map<JoinOperator, QBJoinTree> joinContext,
+      Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext,
       HashMap<TableScanOperator, Table> topToTable,
       List<LoadTableDesc> loadTableWork, List<LoadFileDesc> loadFileWork,
       Context ctx, HashMap<String, String> idToTableNameMap, int destTableId,
@@ -178,6 +181,7 @@ public class ParseContext {
     this.opToPartPruner = opToPartPruner;
     this.opToPartList = opToPartList;
     this.joinContext = joinContext;
+    this.smbMapJoinContext = smbMapJoinContext;
     this.topToTable = topToTable;
     this.loadFileWork = loadFileWork;
     this.loadTableWork = loadTableWork;
@@ -528,6 +532,14 @@ public class ParseContext {
     this.mapJoinContext = mapJoinContext;
   }
 
+  public Map<SMBMapJoinOperator, QBJoinTree> getSmbMapJoinContext() {
+    return smbMapJoinContext;
+  }
+
+  public void setSmbMapJoinContext(Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext) {
+    this.smbMapJoinContext = smbMapJoinContext;
+  }
+
   public GlobalLimitCtx getGlobalLimitCtx() {
     return globalLimitCtx;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Tue Feb 19 05:17:52 2013
@@ -72,6 +72,8 @@ import org.apache.hadoop.hive.ql.exec.Re
 import org.apache.hadoop.hive.ql.exec.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.StatsTask;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -193,6 +195,7 @@ public class SemanticAnalyzer extends Ba
   private List<LoadTableDesc> loadTableWork;
   private List<LoadFileDesc> loadFileWork;
   private Map<JoinOperator, QBJoinTree> joinContext;
+  private Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext;
   private final HashMap<TableScanOperator, Table> topToTable;
   private QB qb;
   private ASTNode ast;
@@ -250,6 +253,7 @@ public class SemanticAnalyzer extends Ba
     loadFileWork = new ArrayList<LoadFileDesc>();
     opParseCtx = new LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>();
     joinContext = new HashMap<JoinOperator, QBJoinTree>();
+    smbMapJoinContext = new HashMap<SMBMapJoinOperator, QBJoinTree>();
     topToTable = new HashMap<TableScanOperator, Table>();
     destTableId = 1;
     uCtx = null;
@@ -278,6 +282,7 @@ public class SemanticAnalyzer extends Ba
     ast = null;
     uCtx = null;
     joinContext.clear();
+    smbMapJoinContext.clear();
     opParseCtx.clear();
     groupOpToInputTables.clear();
     prunedPartitions.clear();
@@ -293,6 +298,7 @@ public class SemanticAnalyzer extends Ba
     loadTableWork = pctx.getLoadTableWork();
     loadFileWork = pctx.getLoadFileWork();
     joinContext = pctx.getJoinContext();
+    smbMapJoinContext = pctx.getSmbMapJoinContext();
     ctx = pctx.getContext();
     destTableId = pctx.getDestTableId();
     idToTableNameMap = pctx.getIdToTableNameMap();
@@ -307,7 +313,7 @@ public class SemanticAnalyzer extends Ba
 
   public ParseContext getParseContext() {
     return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps,
-        topSelOps, opParseCtx, joinContext, topToTable, loadTableWork,
+        topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, loadTableWork,
         loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
         listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
         opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
@@ -8461,7 +8467,8 @@ public class SemanticAnalyzer extends Ba
     }
 
     ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner,
-        opToPartList, topOps, topSelOps, opParseCtx, joinContext, topToTable,
+        opToPartList, topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext,
+        topToTable,
         loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
         listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
         opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java Tue Feb 19 05:17:52 2013
@@ -31,7 +31,7 @@ public class SMBJoinDesc extends MapJoin
 
   private MapredLocalWork localWork;
 
-  //keep a mapping from tag to the fetch operator alias
+  // keep a mapping from tag to the fetch operator alias
   private HashMap<Byte, String> tagToAlias;
   private Map<String, DummyStoreOperator> aliasToSink;
 

Modified: hive/trunk/ql/src/test/queries/clientnegative/smb_mapjoin_14.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientnegative/smb_mapjoin_14.q?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientnegative/smb_mapjoin_14.q (original)
+++ hive/trunk/ql/src/test/queries/clientnegative/smb_mapjoin_14.q Tue Feb 19 05:17:52 2013
@@ -18,7 +18,7 @@ set hive.input.format = org.apache.hadoo
 -- A join is being performed across different sub-queries, where a mapjoin is being performed in each of them.
 -- Each sub-query should be converted to a sort-merge join.
 -- A join followed by mapjoin is not allowed, so this query should fail.
--- Once HIVE-3433 is in, this should be automatically converted to a sort-merge join without the hint
+-- Once HIVE-3403 is in, this should be automatically converted to a sort-merge join without the hint
 explain
 select src1.key, src1.cnt1, src2.cnt1 from
 (

Added: hive/trunk/ql/src/test/queries/clientpositive/auto_smb_mapjoin_14.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/auto_smb_mapjoin_14.q?rev=1447593&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/auto_smb_mapjoin_14.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/auto_smb_mapjoin_14.q Tue Feb 19 05:17:52 2013
@@ -0,0 +1,296 @@
+set hive.enforce.bucketing = true;
+set hive.enforce.sorting = true;
+set hive.exec.reducers.max = 1;
+
+CREATE TABLE tbl1(key int, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+CREATE TABLE tbl2(key int, value string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+
+insert overwrite table tbl1
+select * from src where key < 10;
+
+insert overwrite table tbl2
+select * from src where key < 10;
+
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+
+set hive.auto.convert.sortmerge.join=true;
+
+-- The join is being performed as part of sub-query. It should be converted to a sort-merge join
+explain
+select count(*) from (
+  select a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+) subq1;
+
+select count(*) from (
+  select a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+) subq1;
+
+-- The join is being performed as part of more than one sub-query. It should be converted to a sort-merge join
+explain
+select count(*) from
+(
+  select key, count(*) from 
+  (
+    select a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+  ) subq1
+  group by key
+) subq2;
+
+select count(*) from
+(
+  select key, count(*) from 
+  (
+    select a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+  ) subq1
+  group by key
+) subq2;
+
+-- A join is being performed across different sub-queries, where a join is being performed in each of them.
+-- Each sub-query should be converted to a sort-merge join.
+explain
+select src1.key, src1.cnt1, src2.cnt1 from
+(
+  select key, count(*) as cnt1 from 
+  (
+    select a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+  ) subq1 group by key
+) src1
+join
+(
+  select key, count(*) as cnt1 from 
+  (
+    select a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+  ) subq2 group by key
+) src2
+on src1.key = src2.key
+order by src1.key, src1.cnt1, src2.cnt1;
+
+select src1.key, src1.cnt1, src2.cnt1 from
+(
+  select key, count(*) as cnt1 from 
+  (
+    select a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+  ) subq1 group by key
+) src1
+join
+(
+  select key, count(*) as cnt1 from 
+  (
+    select a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+  ) subq2 group by key
+) src2
+on src1.key = src2.key
+order by src1.key, src1.cnt1, src2.cnt1;
+
+-- The subquery itself is being joined. Since the sub-query only contains selects and filters, it should 
+-- be converted to a sort-merge join.
+explain
+select count(*) from 
+  (select a.key as key, a.value as value from tbl1 a where key < 6) subq1 
+    join
+  (select a.key as key, a.value as value from tbl2 a where key < 6) subq2
+  on subq1.key = subq2.key;
+
+select count(*) from 
+  (select a.key as key, a.value as value from tbl1 a where key < 6) subq1 
+    join
+  (select a.key as key, a.value as value from tbl2 a where key < 6) subq2
+  on subq1.key = subq2.key;
+
+-- The subquery itself is being joined. Since the sub-query only contains selects and filters, it should 
+-- be converted to a sort-merge join, although there is more than one level of sub-query
+explain
+select count(*) from 
+  (
+  select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq1 
+  where key < 6
+  ) subq2
+  join tbl2 b
+  on subq2.key = b.key;
+
+select count(*) from 
+  (
+  select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq1 
+  where key < 6
+  ) subq2
+  join tbl2 b
+  on subq2.key = b.key;
+
+-- Both the tables are nested sub-queries i.e more then 1 level of sub-query.
+-- The join should be converted to a sort-merge join
+explain
+select count(*) from 
+  (
+  select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq1 
+  where key < 6
+  ) subq2
+  join
+  (
+  select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq3 
+  where key < 6
+  ) subq4
+  on subq2.key = subq4.key;
+
+select count(*) from 
+  (
+  select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq1 
+  where key < 6
+  ) subq2
+  join
+  (
+  select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq3 
+  where key < 6
+  ) subq4
+  on subq2.key = subq4.key;
+
+-- The subquery itself is being joined. Since the sub-query only contains selects and filters and the join key
+-- is not getting modified, it should be converted to a sort-merge join. Note that the sub-query modifies one 
+-- item, but that is not part of the join key.
+explain
+select count(*) from 
+  (select a.key as key, concat(a.value, a.value) as value from tbl1 a where key < 8) subq1 
+    join
+  (select a.key as key, concat(a.value, a.value) as value from tbl2 a where key < 8) subq2
+  on subq1.key = subq2.key;
+
+select count(*) from 
+  (select a.key as key, concat(a.value, a.value) as value from tbl1 a where key < 8) subq1 
+    join
+  (select a.key as key, concat(a.value, a.value) as value from tbl2 a where key < 8) subq2
+  on subq1.key = subq2.key;
+
+-- Since the join key is modified by the sub-query, neither sort-merge join not bucketized map-side
+-- join should be performed
+explain
+select count(*) from 
+  (select a.key +1 as key, concat(a.value, a.value) as value from tbl1 a) subq1 
+    join
+  (select a.key +1 as key, concat(a.value, a.value) as value from tbl2 a) subq2
+  on subq1.key = subq2.key;
+
+select count(*) from 
+  (select a.key +1 as key, concat(a.value, a.value) as value from tbl1 a) subq1 
+    join
+  (select a.key +1 as key, concat(a.value, a.value) as value from tbl2 a) subq2
+  on subq1.key = subq2.key;
+
+-- One of the tables is a sub-query and the other is not.
+-- It should be converted to a sort-merge join.
+explain
+select count(*) from 
+  (select a.key as key, a.value as value from tbl1 a where key < 6) subq1 
+    join tbl2 a on subq1.key = a.key;
+
+select count(*) from 
+  (select a.key as key, a.value as value from tbl1 a where key < 6) subq1 
+    join tbl2 a on subq1.key = a.key;
+
+-- There are more than 2 inputs to the join, all of them being sub-queries. 
+-- It should be converted to to a sort-merge join
+explain
+select count(*) from 
+  (select a.key as key, a.value as value from tbl1 a where key < 6) subq1 
+    join
+  (select a.key as key, a.value as value from tbl2 a where key < 6) subq2
+  on (subq1.key = subq2.key)
+    join
+  (select a.key as key, a.value as value from tbl2 a where key < 6) subq3
+  on (subq1.key = subq3.key);
+
+select count(*) from 
+  (select a.key as key, a.value as value from tbl1 a where key < 6) subq1 
+    join
+  (select a.key as key, a.value as value from tbl2 a where key < 6) subq2
+  on subq1.key = subq2.key
+    join
+  (select a.key as key, a.value as value from tbl2 a where key < 6) subq3
+  on (subq1.key = subq3.key);
+
+-- The join is being performed on a nested sub-query, and an aggregation is performed after that.
+-- The join should be converted to a sort-merge join
+explain
+select count(*) from (
+  select subq2.key as key, subq2.value as value1, b.value as value2 from
+  (
+    select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq1
+    where key < 6
+  ) subq2
+join tbl2 b
+on subq2.key = b.key) a;
+
+select count(*) from (
+  select subq2.key as key, subq2.value as value1, b.value as value2 from
+  (
+    select * from
+    (
+      select a.key as key, a.value as value from tbl1 a where key < 8
+    ) subq1
+    where key < 6
+  ) subq2
+join tbl2 b
+on subq2.key = b.key) a;
+
+CREATE TABLE dest1(key int, value string);
+CREATE TABLE dest2(key int, val1 string, val2 string);
+
+-- The join is followed by a multi-table insert. It should be converted to
+-- a sort-merge join
+explain
+from (
+  select a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+) subq1
+insert overwrite table dest1 select key, val1
+insert overwrite table dest2 select key, val1, val2;
+
+from (
+  select a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+) subq1
+insert overwrite table dest1 select key, val1
+insert overwrite table dest2 select key, val1, val2;
+
+select * from dest1 order by key, value;
+select * from dest2 order by key, val1, val2;
+
+DROP TABLE dest2;
+CREATE TABLE dest2(key int, cnt int);
+
+-- The join is followed by a multi-table insert, and one of the inserts involves a reducer.
+-- It should be converted to a sort-merge join
+explain
+from (
+  select a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+) subq1
+insert overwrite table dest1 select key, val1
+insert overwrite table dest2 select key, count(*) group by key;
+
+from (
+  select a.key as key, a.value as val1, b.value as val2 from tbl1 a join tbl2 b on a.key = b.key
+) subq1
+insert overwrite table dest1 select key, val1
+insert overwrite table dest2 select key, count(*) group by key;
+
+select * from dest1 order by key, value;
+select * from dest2 order by key;

Added: hive/trunk/ql/src/test/queries/clientpositive/auto_sortmerge_join_1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/auto_sortmerge_join_1.q?rev=1447593&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/auto_sortmerge_join_1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/auto_sortmerge_join_1.q Tue Feb 19 05:17:52 2013
@@ -0,0 +1,30 @@
+-- small 1 part, 2 bucket & big 2 part, 4 bucket
+
+CREATE TABLE bucket_small (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/smallsrcsortbucket1outof4.txt' INTO TABLE bucket_small partition(ds='2008-04-08');
+load data local inpath '../data/files/smallsrcsortbucket2outof4.txt' INTO TABLE bucket_small partition(ds='2008-04-08');
+
+CREATE TABLE bucket_big (key string, value string) partitioned by (ds string) CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS STORED AS TEXTFILE;
+load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-08');
+load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-08');
+load data local inpath '../data/files/srcsortbucket3outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-08');
+load data local inpath '../data/files/srcsortbucket4outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-08');
+
+load data local inpath '../data/files/srcsortbucket1outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-09');
+load data local inpath '../data/files/srcsortbucket2outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-09');
+load data local inpath '../data/files/srcsortbucket3outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-09');
+load data local inpath '../data/files/srcsortbucket4outof4.txt' INTO TABLE bucket_big partition(ds='2008-04-09');
+
+set hive.auto.convert.join=true;
+set hive.auto.convert.sortmerge.join=true;
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+
+set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;
+
+-- Since size is being used to find the big table, the order of the tables in the join does not matter
+explain extended select count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
+select count(*) FROM bucket_small a JOIN bucket_big b ON a.key = b.key;
+
+explain extended select count(*) FROM bucket_big a JOIN bucket_small b ON a.key = b.key;
+select count(*) FROM bucket_big a JOIN bucket_small b ON a.key = b.key;



Mime
View raw message