hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r1447593 [1/8] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ data/files/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/phy...
Date Tue, 19 Feb 2013 05:17:54 GMT
Author: namit
Date: Tue Feb 19 05:17:52 2013
New Revision: 1447593

URL: http://svn.apache.org/r1447593
Log:
HIVE-3403 user should not specify mapjoin to perform sort-merge bucketed join
(Namit Jain via Ashutosh)


Added:
    hive/trunk/data/files/smallsrcsortbucket1outof4.txt
    hive/trunk/data/files/smallsrcsortbucket2outof4.txt
    hive/trunk/data/files/smallsrcsortbucket3outof4.txt
    hive/trunk/data/files/smallsrcsortbucket4outof4.txt
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BigTableSelectorForAutoSMJ.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketJoinProcCtx.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapjoinProc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LeftmostBigTableSelectorForAutoSMJ.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortBucketJoinProcCtx.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapjoinProc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeJoinProc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java
    hive/trunk/ql/src/test/queries/clientpositive/auto_smb_mapjoin_14.q
    hive/trunk/ql/src/test/queries/clientpositive/auto_sortmerge_join_1.q
    hive/trunk/ql/src/test/queries/clientpositive/auto_sortmerge_join_10.q
    hive/trunk/ql/src/test/queries/clientpositive/auto_sortmerge_join_2.q
    hive/trunk/ql/src/test/queries/clientpositive/auto_sortmerge_join_3.q
    hive/trunk/ql/src/test/queries/clientpositive/auto_sortmerge_join_4.q
    hive/trunk/ql/src/test/queries/clientpositive/auto_sortmerge_join_5.q
    hive/trunk/ql/src/test/queries/clientpositive/auto_sortmerge_join_6.q
    hive/trunk/ql/src/test/queries/clientpositive/auto_sortmerge_join_7.q
    hive/trunk/ql/src/test/queries/clientpositive/auto_sortmerge_join_8.q
    hive/trunk/ql/src/test/queries/clientpositive/auto_sortmerge_join_9.q
    hive/trunk/ql/src/test/results/clientpositive/auto_smb_mapjoin_14.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_10.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_3.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_4.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_5.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_6.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_7.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_8.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_9.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java
    hive/trunk/ql/src/test/queries/clientnegative/smb_mapjoin_14.q

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Feb 19 05:17:52 2013
@@ -500,6 +500,11 @@ public class HiveConf extends Configurat
     HIVEENFORCESORTMERGEBUCKETMAPJOIN("hive.enforce.sortmergebucketmapjoin", false),
     HIVEENFORCEBUCKETMAPJOIN("hive.enforce.bucketmapjoin", false),
 
+    HIVE_AUTO_SORTMERGE_JOIN("hive.auto.convert.sortmerge.join", false),
+    HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR(
+        "hive.auto.convert.sortmerge.join.bigtable.selection.policy",
+        "org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ"),
+
     HIVESCRIPTOPERATORTRUST("hive.exec.script.trust", false),
     HIVEROWOFFSET("hive.exec.rowoffset", false),
 

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Tue Feb 19 05:17:52 2013
@@ -935,6 +935,27 @@
 </property>
 
 <property>
+  <name>hive.auto.convert.sortmerge.join</name>
+  <value>false</value>
+  <description>Will the join be automatically converted to a sort-merge join, if the joined tables pass
+    the criteria for sort-merge join.
+  </description>
+</property>
+
+<property>
+  <name>hive.auto.convert.sortmerge.join.bigtable.selection.policy</name>
+  <value>org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ</value>
+  <description>The policy to choose the big table for automatic conversion to sort-merge join.
+    By default, the leftmost table is assigned the big table. Other policies are based on size:
+    . based on total size (all the partitions selected in the query) of the table 
+    org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ.
+    . based on average size (all the partitions selected in the query) of the table 
+    org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.
+    New policies can be added in future.
+  </description>
+</property>
+
+<property>
   <name>hive.metastore.ds.connection.url.hook</name>
   <value></value>
   <description>Name of the hook to use for retriving the JDO connection URL. If empty, the value in javax.jdo.option.ConnectionURL is used </description>

Added: hive/trunk/data/files/smallsrcsortbucket1outof4.txt
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/smallsrcsortbucket1outof4.txt?rev=1447593&view=auto
==============================================================================
--- hive/trunk/data/files/smallsrcsortbucket1outof4.txt (added)
+++ hive/trunk/data/files/smallsrcsortbucket1outof4.txt Tue Feb 19 05:17:52 2013
@@ -0,0 +1,5 @@
+0val_0
+103val_103
+169val_169
+172val_172
+374val_374

Added: hive/trunk/data/files/smallsrcsortbucket2outof4.txt
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/smallsrcsortbucket2outof4.txt?rev=1447593&view=auto
==============================================================================
--- hive/trunk/data/files/smallsrcsortbucket2outof4.txt (added)
+++ hive/trunk/data/files/smallsrcsortbucket2outof4.txt Tue Feb 19 05:17:52 2013
@@ -0,0 +1,5 @@
+180val_180
+221val_221
+379val_379
+478val_478
+74val_74

Added: hive/trunk/data/files/smallsrcsortbucket3outof4.txt
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/smallsrcsortbucket3outof4.txt?rev=1447593&view=auto
==============================================================================
--- hive/trunk/data/files/smallsrcsortbucket3outof4.txt (added)
+++ hive/trunk/data/files/smallsrcsortbucket3outof4.txt Tue Feb 19 05:17:52 2013
@@ -0,0 +1,5 @@
+233val_233
+424val_424
+468val_468
+53val_53
+97val_97

Added: hive/trunk/data/files/smallsrcsortbucket4outof4.txt
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/smallsrcsortbucket4outof4.txt?rev=1447593&view=auto
==============================================================================
--- hive/trunk/data/files/smallsrcsortbucket4outof4.txt (added)
+++ hive/trunk/data/files/smallsrcsortbucket4outof4.txt Tue Feb 19 05:17:52 2013
@@ -0,0 +1,5 @@
+146val_146
+193val_193
+432val_432
+65val_65
+83val_83

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Tue Feb 19 05:17:52 2013
@@ -172,6 +172,11 @@ public class FilterOperator extends Oper
   }
 
   @Override
+  public boolean supportAutomaticSortMergeJoin() {
+    return true;
+  }
+
+  @Override
   public boolean supportUnionRemoveOptimization() {
     return true;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Feb 19 05:17:52 2013
@@ -1475,6 +1475,15 @@ public abstract class Operator<T extends
     this.useBucketizedHiveInputFormat = useBucketizedHiveInputFormat;
   }
 
+  /**
+   * Whether this operator supports automatic sort merge join.
+   * The stack is traversed, and this method is invoked for all the operators.
+   * @return TRUE if yes, FALSE otherwise.
+   */
+  public boolean supportAutomaticSortMergeJoin() {
+    return false;
+  }
+
   public boolean supportUnionRemoveOptimization() {
     return false;
   }
@@ -1496,4 +1505,13 @@ public abstract class Operator<T extends
   public boolean opAllowedAfterMapJoin() {
     return true;
   }
+
+  /*
+   * If this task contains a join, it can be converted to a map-join task if this operator is
+   * present in the mapper. For eg. if a sort-merge join operator is present followed by a regular
+   * join, it cannot be converted to a auto map-join.
+   */
+  public boolean opAllowedConvertMapJoin() {
+    return true;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Tue Feb 19 05:17:52 2013
@@ -75,6 +75,12 @@ public class SMBMapJoinOperator extends 
   transient boolean firstFetchHappened = false;
   private transient boolean inputFileChanged = false;
   transient boolean localWorkInited = false;
+  transient boolean initDone = false;
+
+  // This join has been converted to a SMB join by the hive optimizer. The user did not
+  // give a mapjoin hint in the query. The hive optimizer figured out that the join can be
+  // performed as a smb join, based on all the tables/partitions being joined.
+  private transient boolean convertedAutomaticallySMBJoin = false;
 
   public SMBMapJoinOperator() {
   }
@@ -85,6 +91,13 @@ public class SMBMapJoinOperator extends 
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
+
+    // If there is a sort-merge join followed by a regular join, the SMBJoinOperator may not
+    // get initialized at all. Consider the following query:
+    // A SMB B JOIN C
+    // For the mapper processing C, The SMJ is not initialized, no need to close it either.
+    initDone = true;
+
     super.initializeOp(hconf);
 
     firstRow = true;
@@ -558,6 +571,15 @@ public class SMBMapJoinOperator extends 
     }
     closeCalled = true;
 
+    // If there is a sort-merge join followed by a regular join, the SMBJoinOperator may not
+    // get initialized at all. Consider the following query:
+    // A SMB B JOIN C
+    // For the mapper processing C, The SMJ is not initialized, no need to close it either.
+    if (!initDone) {
+      return;
+    }
+
+
     if (inputFileChanged || !firstFetchHappened) {
       //set up the fetch operator for the new input file.
       for (Map.Entry<String, MergeQueue> entry : aliasToMergeQueue.entrySet()) {
@@ -620,6 +642,14 @@ public class SMBMapJoinOperator extends 
     return OperatorType.MAPJOIN;
   }
 
+  public boolean isConvertedAutomaticallySMBJoin() {
+    return convertedAutomaticallySMBJoin;
+  }
+
+  public void setConvertedAutomaticallySMBJoin(boolean convertedAutomaticallySMBJoin) {
+    this.convertedAutomaticallySMBJoin = convertedAutomaticallySMBJoin;
+  }
+
   // returns rows from possibly multiple bucket files of small table in ascending order
   // by utilizing primary queue (borrowed from hadoop)
   // elements of queue (Integer) are index to FetchOperator[] (segments)
@@ -778,4 +808,9 @@ public class SMBMapJoinOperator extends 
       return false;
     }
   }
+
+  @Override
+  public boolean opAllowedConvertMapJoin() {
+    return false;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Tue Feb 19 05:17:52 2013
@@ -112,6 +112,11 @@ public class SelectOperator extends Oper
   }
 
   @Override
+  public boolean supportAutomaticSortMergeJoin() {
+    return true;
+  }
+
+  @Override
   public boolean supportUnionRemoveOptimization() {
     return true;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Tue Feb 19 05:17:52 2013
@@ -295,4 +295,9 @@ public class TableScanOperator extends O
   public boolean supportSkewJoinOptimization() {
     return true;
   }
+
+  @Override
+  public boolean supportAutomaticSortMergeJoin() {
+    return true;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java?rev=1447593&r1=1447592&r2=1447593&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java Tue Feb 19 05:17:52 2013
@@ -17,32 +17,438 @@
  */
 package org.apache.hadoop.hive.ql.optimizer;
 
+import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.QB;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
  * this transformation does bucket map join optimization.
  */
 abstract public class AbstractBucketJoinProc implements NodeProcessor {
+  private static final Log LOG =
+    LogFactory.getLog(AbstractBucketJoinProc.class.getName());
 
-  private static final Log LOG = LogFactory.getLog(AbstractBucketJoinProc.class.getName());
+  protected ParseContext pGraphContext;
+
+  public AbstractBucketJoinProc(ParseContext pGraphContext) {
+    this.pGraphContext = pGraphContext;
+  }
 
   public AbstractBucketJoinProc() {
   }
 
   @Override
   abstract public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-      Object... nodeOutputs) throws SemanticException;
+    Object... nodeOutputs) throws SemanticException;
+
+  private static List<String> getOnePartitionBucketFileNames(
+    URI location, ParseContext pGraphContext) throws SemanticException {
+    List<String> fileNames = new ArrayList<String>();
+    try {
+      FileSystem fs = FileSystem.get(location, pGraphContext.getConf());
+      FileStatus[] files = fs.listStatus(new Path(location.toString()));
+      if (files != null) {
+        for (FileStatus file : files) {
+          fileNames.add(file.getPath().toString());
+        }
+      }
+    } catch (IOException e) {
+      throw new SemanticException(e);
+    }
+    return fileNames;
+  }
+
+  private boolean checkBucketColumns(List<String> bucketColumns,
+    List<String> keys,
+    Integer[] orders) {
+    if (keys == null || bucketColumns == null || bucketColumns.isEmpty()) {
+      return false;
+    }
+    for (int i = 0; i < keys.size(); i++) {
+      int index = bucketColumns.indexOf(keys.get(i));
+      if (orders[i] != null && orders[i] != index) {
+        return false;
+      }
+      orders[i] = index;
+    }
+
+    // Check if the join columns contains all bucket columns.
+    // If a table is bucketized on column B, but the join key is A and B,
+    // it is easy to see joining on different buckets yield empty results.
+    return keys.containsAll(bucketColumns);
+  }
+
+  private boolean checkBucketNumberAgainstBigTable(
+    Map<String, List<Integer>> aliasToBucketNumber, int bucketNumberInPart) {
+    for (List<Integer> bucketNums : aliasToBucketNumber.values()) {
+      for (int nxt : bucketNums) {
+        boolean ok = (nxt >= bucketNumberInPart) ? nxt % bucketNumberInPart == 0
+          : bucketNumberInPart % nxt == 0;
+        if (!ok) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  protected boolean canConvertMapJoinToBucketMapJoin(
+    MapJoinOperator mapJoinOp,
+    ParseContext pGraphContext,
+    BucketJoinProcCtx context) throws SemanticException {
+
+    QBJoinTree joinCtx = this.pGraphContext.getMapJoinContext().get(mapJoinOp);
+    if (joinCtx == null) {
+      return false;
+    }
+
+    List<String> joinAliases = new ArrayList<String>();
+    String[] srcs = joinCtx.getBaseSrc();
+    String[] left = joinCtx.getLeftAliases();
+    List<String> mapAlias = joinCtx.getMapAliases();
+    String baseBigAlias = null;
+
+    for (String s : left) {
+      if (s != null) {
+        String subQueryAlias = QB.getAppendedAliasFromId(joinCtx.getId(), s);
+        if (!joinAliases.contains(subQueryAlias)) {
+          joinAliases.add(subQueryAlias);
+          if (!mapAlias.contains(s)) {
+            baseBigAlias = subQueryAlias;
+          }
+        }
+      }
+    }
+
+    for (String s : srcs) {
+      if (s != null) {
+        String subQueryAlias = QB.getAppendedAliasFromId(joinCtx.getId(), s);
+        if (!joinAliases.contains(subQueryAlias)) {
+          joinAliases.add(subQueryAlias);
+          if (!mapAlias.contains(s)) {
+            baseBigAlias = subQueryAlias;
+          }
+        }
+      }
+    }
+
+    Map<Byte, List<ExprNodeDesc>> keysMap = mapJoinOp.getConf().getKeys();
+
+    return checkConvertBucketMapJoin(
+      pGraphContext,
+      context,
+      joinCtx,
+      keysMap,
+      baseBigAlias,
+      joinAliases);
+  }
+
+  /*
+   * Can this mapjoin be converted to a bucketed mapjoin ?
+   * The following checks are performed:
+   * a. The join columns contains all the bucket columns.
+   * b. The join keys are not transformed in the sub-query.
+   * c. All partitions contain the expected number of files (number of buckets).
+   * d. The number of buckets in the big table can be divided by no of buckets in small tables.
+   */
+  protected boolean checkConvertBucketMapJoin(
+    ParseContext pGraphContext,
+    BucketJoinProcCtx context,
+    QBJoinTree joinCtx,
+    Map<Byte, List<ExprNodeDesc>> keysMap,
+    String baseBigAlias,
+    List<String> joinAliases) throws SemanticException {
+
+    LinkedHashMap<String, List<Integer>> aliasToPartitionBucketNumberMapping =
+      new LinkedHashMap<String, List<Integer>>();
+    LinkedHashMap<String, List<List<String>>> aliasToPartitionBucketFileNamesMapping =
+      new LinkedHashMap<String, List<List<String>>>();
+
+    HashMap<String, Operator<? extends OperatorDesc>> topOps = pGraphContext.getTopOps();
+    Map<TableScanOperator, Table> topToTable = pGraphContext.getTopToTable();
+
+    // (partition to bucket file names) and (partition to bucket number) for
+    // the big table;
+    LinkedHashMap<Partition, List<String>> bigTblPartsToBucketFileNames =
+      new LinkedHashMap<Partition, List<String>>();
+    LinkedHashMap<Partition, Integer> bigTblPartsToBucketNumber =
+      new LinkedHashMap<Partition, Integer>();
+
+    Integer[] orders = null; // accessing order of join cols to bucket cols, should be same
+    boolean bigTablePartitioned = true;
+    for (int index = 0; index < joinAliases.size(); index++) {
+      String alias = joinAliases.get(index);
+      Operator<? extends OperatorDesc> topOp = joinCtx.getAliasToOpInfo().get(alias);
+      if (topOp == null) {
+        return false;
+      }
+      List<String> keys = toColumns(keysMap.get((byte) index));
+      if (keys == null || keys.isEmpty()) {
+        return false;
+      }
+      int oldKeySize = keys.size();
+      TableScanOperator tso = TableAccessAnalyzer.genRootTableScan(topOp, keys);
+      if (tso == null) {
+        return false;
+      }
+
+      // For nested sub-queries, the alias mapping is not maintained in QB currently.
+      if (topOps.containsValue(tso)) {
+        for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpEntry : topOps.entrySet()) {
+          if (topOpEntry.getValue() == tso) {
+            String newAlias = topOpEntry.getKey();
+            joinAliases.set(index, newAlias);
+            if (baseBigAlias.equals(alias)) {
+              baseBigAlias = newAlias;
+            }
+            alias = newAlias;
+            break;
+          }
+        }
+      }
+      else {
+        // Ideally, this should never happen, and this should be an assert.
+        return false;
+      }
+
+      // The join keys cannot be transformed in the sub-query currently.
+      // TableAccessAnalyzer.genRootTableScan will only return the base table scan
+      // if the join keys are constants or a column. Even a simple cast of the join keys
+      // will result in a null table scan operator. In case of constant join keys, they would
+      // be removed, and the size before and after the genRootTableScan will be different.
+      if (keys.size() != oldKeySize) {
+        return false;
+      }
+
+      if (orders == null) {
+        orders = new Integer[keys.size()];
+      }
+
+      Table tbl = topToTable.get(tso);
+      if (tbl.isPartitioned()) {
+        PrunedPartitionList prunedParts;
+        try {
+          prunedParts = pGraphContext.getOpToPartList().get(tso);
+          if (prunedParts == null) {
+            prunedParts =
+              PartitionPruner.prune(tbl, pGraphContext.getOpToPartPruner().get(tso),
+                                    pGraphContext.getConf(), alias,
+                                    pGraphContext.getPrunedPartitions());
+            pGraphContext.getOpToPartList().put(tso, prunedParts);
+          }
+        } catch (HiveException e) {
+          // Has to use full name to make sure it does not conflict with
+          // org.apache.commons.lang.StringUtils
+          LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+          throw new SemanticException(e.getMessage(), e);
+        }
+        List<Partition> partitions = prunedParts.getNotDeniedPartns();
+        // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number)
+        if (partitions.isEmpty()) {
+          if (!alias.equals(baseBigAlias)) {
+            aliasToPartitionBucketNumberMapping.put(alias, Arrays.<Integer>asList());
+            aliasToPartitionBucketFileNamesMapping.put(alias, new ArrayList<List<String>>());
+          }
+        } else {
+          List<Integer> buckets = new ArrayList<Integer>();
+          List<List<String>> files = new ArrayList<List<String>>();
+          for (Partition p : partitions) {
+            if (!checkBucketColumns(p.getBucketCols(), keys, orders)) {
+              return false;
+            }
+            List<String> fileNames =
+              getOnePartitionBucketFileNames(p.getDataLocation(), pGraphContext);
+            // The number of files for the table should be same as number of buckets.
+            int bucketCount = p.getBucketCount();
+
+            if (fileNames.size() != bucketCount) {
+              String msg = "The number of buckets for table " +
+                tbl.getTableName() + " partition " + p.getName() + " is " +
+                p.getBucketCount() + ", whereas the number of files is " + fileNames.size();
+              throw new SemanticException(
+                ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg));
+            }
+
+            if (alias.equals(baseBigAlias)) {
+              bigTblPartsToBucketFileNames.put(p, fileNames);
+              bigTblPartsToBucketNumber.put(p, bucketCount);
+            } else {
+              files.add(fileNames);
+              buckets.add(bucketCount);
+            }
+          }
+          if (!alias.equals(baseBigAlias)) {
+            aliasToPartitionBucketNumberMapping.put(alias, buckets);
+            aliasToPartitionBucketFileNamesMapping.put(alias, files);
+          }
+        }
+      } else {
+        if (!checkBucketColumns(tbl.getBucketCols(), keys, orders)) {
+          return false;
+        }
+        List<String> fileNames =
+          getOnePartitionBucketFileNames(tbl.getDataLocation(), pGraphContext);
+        Integer num = new Integer(tbl.getNumBuckets());
+
+        // The number of files for the table should be same as number of buckets.
+        if (fileNames.size() != num) {
+          String msg = "The number of buckets for table " +
+            tbl.getTableName() + " is " + tbl.getNumBuckets() +
+            ", whereas the number of files is " + fileNames.size();
+          throw new SemanticException(
+            ErrorMsg.BUCKETED_TABLE_METADATA_INCORRECT.getMsg(msg));
+        }
+
+        if (alias.equals(baseBigAlias)) {
+          bigTblPartsToBucketFileNames.put(null, fileNames);
+          bigTblPartsToBucketNumber.put(null, tbl.getNumBuckets());
+          bigTablePartitioned = false;
+        } else {
+          aliasToPartitionBucketNumberMapping.put(alias, Arrays.asList(num));
+          aliasToPartitionBucketFileNamesMapping.put(alias, Arrays.asList(fileNames));
+        }
+      }
+    }
+
+    // All tables or partitions are bucketed, and their bucket number is
+    // stored in 'bucketNumbers', we need to check if the number of buckets in
+    // the big table can be divided by no of buckets in small tables.
+    for (Integer bucketNumber : bigTblPartsToBucketNumber.values()) {
+      if (!checkBucketNumberAgainstBigTable(aliasToPartitionBucketNumberMapping, bucketNumber)) {
+        return false;
+      }
+    }
+
+    context.setAliasToPartitionBucketNumberMapping(aliasToPartitionBucketNumberMapping);
+    context.setAliasToPartitionBucketFileNamesMapping(aliasToPartitionBucketFileNamesMapping);
+    context.setBigTblPartsToBucketFileNames(bigTblPartsToBucketFileNames);
+    context.setBigTblPartsToBucketNumber(bigTblPartsToBucketNumber);
+    context.setJoinAliases(joinAliases);
+    context.setBaseBigAlias(baseBigAlias);
+    context.setBigTablePartitioned(bigTablePartitioned);
+
+    return true;
+  }
+
+  /*
+   * Convert mapjoin to a bucketed mapjoin.
+   * The operator tree is not changed, but the mapjoin descriptor in the big table is
+   * enhanced to keep the big table bucket -> small table buckets mapping.
+   */
+  protected void convertMapJoinToBucketMapJoin(
+    MapJoinOperator mapJoinOp,
+    BucketJoinProcCtx context) throws SemanticException {
+    MapJoinDesc desc = mapJoinOp.getConf();
+
+    Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
+      new LinkedHashMap<String, Map<String, List<String>>>();
+
+    Map<String, List<Integer>> aliasToPartitionBucketNumberMapping =
+      context.getAliasToPartitionBucketNumberMapping();
+
+    Map<String, List<List<String>>> aliasToPartitionBucketFileNamesMapping =
+      context.getAliasToPartitionBucketFileNamesMapping();
+
+    Map<Partition, List<String>> bigTblPartsToBucketFileNames =
+      context.getBigTblPartsToBucketFileNames();
+
+    Map<Partition, Integer> bigTblPartsToBucketNumber =
+      context.getBigTblPartsToBucketNumber();
+
+    List<String> joinAliases = context.getJoinAliases();
+    String baseBigAlias = context.getBaseBigAlias();
+
+    // sort bucket names for the big table
+    for (List<String> partBucketNames : bigTblPartsToBucketFileNames.values()) {
+      Collections.sort(partBucketNames);
+    }
+
+    // go through all small tables and get the mapping from bucket file name
+    // in the big table to bucket file names in small tables.
+    for (int j = 0; j < joinAliases.size(); j++) {
+      String alias = joinAliases.get(j);
+      if (alias.equals(baseBigAlias)) {
+        continue;
+      }
+      for (List<String> names : aliasToPartitionBucketFileNamesMapping.get(alias)) {
+        Collections.sort(names);
+      }
+      List<Integer> smallTblBucketNums = aliasToPartitionBucketNumberMapping.get(alias);
+      List<List<String>> smallTblFilesList = aliasToPartitionBucketFileNamesMapping.get(alias);
+
+      Map<String, List<String>> mapping = new LinkedHashMap<String, List<String>>();
+      aliasBucketFileNameMapping.put(alias, mapping);
+
+      // for each bucket file in big table, get the corresponding bucket file
+      // name in the small table.
+      // more than 1 partition in the big table, do the mapping for each partition
+      Iterator<Entry<Partition, List<String>>> bigTblPartToBucketNames =
+        bigTblPartsToBucketFileNames.entrySet().iterator();
+      Iterator<Entry<Partition, Integer>> bigTblPartToBucketNum = bigTblPartsToBucketNumber
+        .entrySet().iterator();
+      while (bigTblPartToBucketNames.hasNext()) {
+        assert bigTblPartToBucketNum.hasNext();
+        int bigTblBucketNum = bigTblPartToBucketNum.next().getValue();
+        List<String> bigTblBucketNameList = bigTblPartToBucketNames.next().getValue();
+        fillMapping(smallTblBucketNums, smallTblFilesList,
+          mapping, bigTblBucketNum, bigTblBucketNameList, desc.getBigTableBucketNumMapping());
+      }
+    }
+    desc.setAliasBucketFileNameMapping(aliasBucketFileNameMapping);
+    desc.setBigTableAlias(baseBigAlias);
+    boolean bigTablePartitioned = context.isBigTablePartitioned();
+    if (bigTablePartitioned) {
+      desc.setBigTablePartSpecToFileMapping(convert(bigTblPartsToBucketFileNames));
+    }
+
+    // successfully convert to bucket map join
+    desc.setBucketMapJoin(true);
+  }
+
+  // convert partition to partition spec string
+  private static Map<String, List<String>> convert(Map<Partition, List<String>> mapping) {
+    Map<String, List<String>> converted = new HashMap<String, List<String>>();
+    for (Map.Entry<Partition, List<String>> entry : mapping.entrySet()) {
+      converted.put(entry.getKey().getName(), entry.getValue());
+    }
+    return converted;
+  }
 
   public List<String> toColumns(List<ExprNodeDesc> keys) {
     List<String> columns = new ArrayList<String>();
@@ -54,4 +460,37 @@ abstract public class AbstractBucketJoin
     }
     return columns;
   }
+
+  // called for each partition of big table and populates mapping for each file in the partition
+  private static void fillMapping(
+    List<Integer> smallTblBucketNums,
+    List<List<String>> smallTblFilesList,
+    Map<String, List<String>> mapping,
+    int bigTblBucketNum, List<String> bigTblBucketNameList,
+    Map<String, Integer> bucketFileNameMapping) {
+
+    for (int bindex = 0; bindex < bigTblBucketNameList.size(); bindex++) {
+      ArrayList<String> resultFileNames = new ArrayList<String>();
+      for (int sindex = 0 ; sindex < smallTblBucketNums.size(); sindex++) {
+        int smallTblBucketNum = smallTblBucketNums.get(sindex);
+        List<String> smallTblFileNames = smallTblFilesList.get(sindex);
+        if (bigTblBucketNum >= smallTblBucketNum) {
+          // if the big table has more buckets than the current small table,
+          // use "MOD" to get small table bucket names. For example, if the big
+          // table has 4 buckets and the small table has 2 buckets, then the
+          // mapping should be 0->0, 1->1, 2->0, 3->1.
+          int toAddSmallIndex = bindex % smallTblBucketNum;
+          resultFileNames.add(smallTblFileNames.get(toAddSmallIndex));
+        } else {
+          int jump = smallTblBucketNum / bigTblBucketNum;
+          for (int i = bindex; i < smallTblFileNames.size(); i = i + jump) {
+            resultFileNames.add(smallTblFileNames.get(i));
+          }
+        }
+      }
+      String inputBigTBLBucket = bigTblBucketNameList.get(bindex);
+      mapping.put(inputBigTBLBucket, resultFileNames);
+      bucketFileNameMapping.put(inputBigTBLBucket, bindex);
+    }
+  }
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1447593&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Tue Feb 19 05:17:52 2013
@@ -0,0 +1,536 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.QB;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TableAccessAnalyzer;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
+import org.apache.hadoop.util.ReflectionUtils;
+
+//try to replace a bucket map join with a sorted merge map join
+abstract public class AbstractSMBJoinProc extends AbstractBucketJoinProc implements NodeProcessor {
+
+  private static final Log LOG = LogFactory
+    .getLog(SortedMergeBucketMapJoinOptimizer.class.getName());
+
+  public AbstractSMBJoinProc(ParseContext pctx) {
+    super(pctx);
+  }
+
+  public AbstractSMBJoinProc() {
+    super();
+  }
+
+  @Override
+  abstract public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+         Object... nodeOutputs) throws SemanticException;
+
+  /*
+   * Return true or false based on whether a bucketed mapjoin can be converted successfully to
+   * a sort-merge map join operator. The following checks are performed:
+   * a. The mapjoin under consideration is a bucketed mapjoin.
+   * b. All the tables are sorted in same order, such that join columns is equal to or a prefix
+   *    of the sort columns.
+   */
+  protected boolean canConvertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp,
+    Stack<Node> stack,
+    SortBucketJoinProcCtx smbJoinContext,
+    Object... nodeOutputs) throws SemanticException {
+
+    // Check whether the mapjoin is a bucketed mapjoin.
+    // The above can be ascertained by checking the big table bucket -> small table buckets
+    // mapping in the mapjoin descriptor.
+    if (mapJoinOp.getConf().getAliasBucketFileNameMapping() == null
+      || mapJoinOp.getConf().getAliasBucketFileNameMapping().size() == 0) {
+      return false;
+    }
+
+    boolean tableSorted = true;
+    QBJoinTree joinCxt = this.pGraphContext.getMapJoinContext()
+      .get(mapJoinOp);
+    if (joinCxt == null) {
+      return false;
+    }
+    String[] srcs = joinCxt.getBaseSrc();
+    for (int srcPos = 0; srcPos < srcs.length; srcPos++) {
+      srcs[srcPos] = QB.getAppendedAliasFromId(joinCxt.getId(), srcs[srcPos]);
+    }
+
+    // All the tables/partitions columns should be sorted in the same order
+    // For example, if tables A and B are being joined on columns c1, c2 and c3
+    // which are the sorted and bucketed columns. The join would work, as long
+    // c1, c2 and c3 are sorted in the same order.
+    List<Order> sortColumnsFirstTable = new ArrayList<Order>();
+
+    for (int pos = 0; pos < srcs.length; pos++) {
+      tableSorted = tableSorted
+        && isTableSorted(smbJoinContext,
+             pGraphContext,
+             mapJoinOp.getConf().getKeys().get((byte) pos),
+             joinCxt,
+             srcs,
+             pos,
+             sortColumnsFirstTable);
+    }
+    if (!tableSorted) {
+      // this is a mapjoin but not suited for a sort merge bucket map join. check outer joins
+      MapJoinProcessor.checkMapJoin(mapJoinOp.getConf().getPosBigTable(),
+            mapJoinOp.getConf().getConds());
+      return false;
+    }
+
+    smbJoinContext.setSrcs(srcs);
+    return true;
+  }
+
+
+  // Convert the bucket map-join operator to a sort-merge map join operator
+  protected SMBMapJoinOperator convertBucketMapJoinToSMBJoin(MapJoinOperator mapJoinOp,
+    SortBucketJoinProcCtx smbJoinContext,
+    ParseContext parseContext) {
+
+    String[] srcs = smbJoinContext.getSrcs();
+    SMBMapJoinOperator smbJop = new SMBMapJoinOperator(mapJoinOp);
+    SMBJoinDesc smbJoinDesc = new SMBJoinDesc(mapJoinOp.getConf());
+    smbJop.setConf(smbJoinDesc);
+    HashMap<Byte, String> tagToAlias = new HashMap<Byte, String>();
+    for (int i = 0; i < srcs.length; i++) {
+      tagToAlias.put((byte) i, srcs[i]);
+    }
+    smbJoinDesc.setTagToAlias(tagToAlias);
+
+    int indexInListMapJoinNoReducer =
+      this.pGraphContext.getListMapJoinOpsNoReducer().indexOf(mapJoinOp);
+    if (indexInListMapJoinNoReducer >= 0 ) {
+      this.pGraphContext.getListMapJoinOpsNoReducer().remove(indexInListMapJoinNoReducer);
+      this.pGraphContext.getListMapJoinOpsNoReducer().add(indexInListMapJoinNoReducer, smbJop);
+    }
+
+    Map<String, DummyStoreOperator> aliasToSink =
+        new HashMap<String, DummyStoreOperator>();
+    // For all parents (other than the big table), insert a dummy store operator
+    /* Consider a query like:
+     *
+     * select * from
+     *   (subq1 --> has a filter)
+     *   join
+     *   (subq2 --> has a filter)
+     * on some key
+     *
+     * Let us assume that subq1 is the small table (either specified by the user or inferred
+     * automatically). The following operator tree will be created:
+     *
+     * TableScan (subq1) --> Select --> Filter --> DummyStore
+     *                                                         \
+     *                                                          \     SMBJoin
+     *                                                          /
+     *                                                         /
+     * TableScan (subq2) --> Select --> Filter
+     */
+
+    List<Operator<? extends OperatorDesc>> parentOperators = mapJoinOp.getParentOperators();
+    for (int i = 0; i < parentOperators.size(); i++) {
+      Operator<? extends OperatorDesc> par = parentOperators.get(i);
+      int index = par.getChildOperators().indexOf(mapJoinOp);
+      par.getChildOperators().remove(index);
+      if (i == smbJoinDesc.getPosBigTable()) {
+        par.getChildOperators().add(index, smbJop);
+      }
+      else {
+        DummyStoreOperator dummyStoreOp = new DummyStoreOperator();
+        par.getChildOperators().add(index, dummyStoreOp);
+
+        List<Operator<? extends OperatorDesc>> childrenOps =
+            new ArrayList<Operator<? extends OperatorDesc>>();
+        childrenOps.add(smbJop);
+        dummyStoreOp.setChildOperators(childrenOps);
+
+        List<Operator<? extends OperatorDesc>> parentOps =
+            new ArrayList<Operator<? extends OperatorDesc>>();
+        parentOps.add(par);
+        dummyStoreOp.setParentOperators(parentOps);
+
+        aliasToSink.put(srcs[i], dummyStoreOp);
+        smbJop.getParentOperators().remove(i);
+        smbJop.getParentOperators().add(i, dummyStoreOp);
+      }
+    }
+    smbJoinDesc.setAliasToSink(aliasToSink);
+
+    List<Operator<? extends OperatorDesc>> childOps = mapJoinOp.getChildOperators();
+    for (int i = 0; i < childOps.size(); i++) {
+      Operator<? extends OperatorDesc> child = childOps.get(i);
+      int index = child.getParentOperators().indexOf(mapJoinOp);
+      child.getParentOperators().remove(index);
+      child.getParentOperators().add(index, smbJop);
+    }
+    parseContext.getSmbMapJoinContext().put(smbJop,
+        parseContext.getMapJoinContext().get(mapJoinOp));
+    parseContext.getMapJoinContext().remove(mapJoinOp);
+    parseContext.getOpParseCtx().put(smbJop, parseContext.getOpParseCtx().get(mapJoinOp));
+
+    return smbJop;
+  }
+
+  /**
+   * Whether this table is eligible for a sort-merge join.
+   *
+   * @param pctx                  parse context
+   * @param op                    map join operator being considered
+   * @param joinTree              join tree being considered
+   * @param alias                 table alias in the join tree being checked
+   * @param pos                   position of the table
+   * @param sortColumnsFirstTable The names and order of the sorted columns for the first table.
+   *                              It is not initialized when pos = 0.
+   * @return
+   * @throws SemanticException
+   */
+  private boolean isTableSorted(
+    SortBucketJoinProcCtx smbJoinContext,
+    ParseContext pctx,
+    List<ExprNodeDesc> keys,
+    QBJoinTree joinTree,
+    String[] aliases,
+    int pos,
+    List<Order> sortColumnsFirstTable) throws SemanticException {
+    String alias = aliases[pos];
+    Map<TableScanOperator, Table> topToTable = this.pGraphContext
+      .getTopToTable();
+
+    /*
+     * Consider a query like:
+     *
+     * select -- mapjoin(subq1) --  * from
+     * (select a.key, a.value from tbl1 a) subq1
+     *   join
+     * (select a.key, a.value from tbl2 a) subq2
+     * on subq1.key = subq2.key;
+     *
+     * aliasToOpInfo contains the SelectOperator for subq1 and subq2.
+     * We need to traverse the tree (using TableAccessAnalyzer) to get to the base
+     * table. If the object being map-joined is a base table, then aliasToOpInfo
+     * contains the TableScanOperator, and TableAccessAnalyzer is a no-op.
+     */
+    Operator<? extends OperatorDesc> topOp = joinTree.getAliasToOpInfo().get(alias);
+    if (topOp == null) {
+      return false;
+    }
+
+    // get all join columns from join keys
+    List<String> joinCols = toColumns(keys);
+    if (joinCols == null || joinCols.isEmpty()) {
+      return false;
+    }
+
+    TableScanOperator tso = TableAccessAnalyzer.genRootTableScan(topOp, joinCols);
+    if (tso == null) {
+      return false;
+    }
+
+    // For nested sub-queries, the alias mapping is not maintained in QB currently.
+    /*
+     * Consider a query like:
+     *
+     * select count(*) from
+     *   (
+     *     select key, count(*) from
+     *       (
+     *         select --mapjoin(a)-- a.key as key, a.value as val1, b.value as val2
+     *         from tbl1 a join tbl2 b on a.key = b.key
+     *       ) subq1
+     *     group by key
+     *   ) subq2;
+     *
+     * The table alias should be subq2:subq1:a which needs to be fetched from topOps.
+     */
+    if (pGraphContext.getTopOps().containsValue(tso)) {
+      for (Map.Entry<String, Operator<? extends OperatorDesc>> topOpEntry :
+        this.pGraphContext.getTopOps().entrySet()) {
+        if (topOpEntry.getValue() == tso) {
+          alias = topOpEntry.getKey();
+          aliases[pos] = alias;
+          break;
+        }
+      }
+    }
+    else {
+      // Ideally, this should never happen, and this should be an assert.
+      return false;
+    }
+
+    Table tbl = topToTable.get(tso);
+    if (tbl.isPartitioned()) {
+      PrunedPartitionList prunedParts = null;
+      try {
+        prunedParts = pGraphContext.getOpToPartList().get(tso);
+        if (prunedParts == null) {
+          prunedParts = PartitionPruner.prune(tbl, pGraphContext
+            .getOpToPartPruner().get(tso), pGraphContext.getConf(), alias,
+          pGraphContext.getPrunedPartitions());
+          pGraphContext.getOpToPartList().put(tso, prunedParts);
+        }
+      } catch (HiveException e) {
+        LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+        throw new SemanticException(e.getMessage(), e);
+      }
+      List<Partition> partitions = prunedParts.getNotDeniedPartns();
+      // Populate the names and order of columns for the first partition of the
+      // first table
+      if ((pos == 0) && (partitions != null) && (!partitions.isEmpty())) {
+        Partition firstPartition = partitions.get(0);
+        sortColumnsFirstTable.addAll(firstPartition.getSortCols());
+      }
+
+      for (Partition partition : prunedParts.getNotDeniedPartns()) {
+        if (!checkSortColsAndJoinCols(partition.getSortCols(),
+          joinCols,
+          sortColumnsFirstTable)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    // Populate the names and order of columns for the first table
+    if (pos == 0) {
+      sortColumnsFirstTable.addAll(tbl.getSortCols());
+    }
+
+    return checkSortColsAndJoinCols(tbl.getSortCols(),
+      joinCols,
+      sortColumnsFirstTable);
+  }
+
+  private boolean checkSortColsAndJoinCols(List<Order> sortCols,
+      List<String> joinCols,
+      List<Order> sortColumnsFirstPartition) {
+
+    if (sortCols == null || sortCols.size() < joinCols.size()) {
+      return false;
+    }
+
+    // A join is eligible for a sort-merge join, only if it is eligible for
+    // a bucketized map join. So, we dont need to check for bucketized map
+    // join here. We are guaranteed that the join keys contain all the
+    // bucketized keys (note that the order need not be the same).
+    List<String> sortColNames = new ArrayList<String>();
+
+    // The join columns should contain all the sort columns
+    // The sort columns of all the tables should be in the same order
+    // compare the column names and the order with the first table/partition.
+    for (int pos = 0; pos < sortCols.size(); pos++) {
+      Order o = sortCols.get(pos);
+
+      if (o.getOrder() != sortColumnsFirstPartition.get(pos).getOrder()) {
+        return false;
+      }
+      sortColNames.add(o.getCol());
+    }
+
+    // The column names and order (ascending/descending) matched
+    // The first 'n' sorted columns should be the same as the joinCols, where
+    // 'n' is the size of join columns.
+    // For eg: if the table is sorted by (a,b,c), it is OK to convert if the join is
+    // on (a), (a,b), or any combination of (a,b,c):
+    //   (a,b,c), (a,c,b), (c,a,b), (c,b,a), (b,c,a), (b,a,c)
+    // but it is not OK to convert if the join is on (a,c)
+    return sortColNames.subList(0, joinCols.size()).containsAll(joinCols);
+  }
+
+  // Can the join operator be converted to a sort-merge join operator ?
+  // It is already verified that the join can be converted to a bucket map join
+  protected boolean checkConvertJoinToSMBJoin(
+    JoinOperator joinOperator,
+    SortBucketJoinProcCtx smbJoinContext,
+    ParseContext pGraphContext) throws SemanticException {
+
+    boolean tableSorted = true;
+    QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOperator);
+
+    if (joinCtx == null) {
+      return false;
+    }
+    String[] srcs = joinCtx.getBaseSrc();
+
+    // All the tables/partitions columns should be sorted in the same order
+    // For example, if tables A and B are being joined on columns c1, c2 and c3
+    // which are the sorted and bucketed columns. The join would work, as long
+    // c1, c2 and c3 are sorted in the same order.
+    List<Order> sortColumnsFirstTable = new ArrayList<Order>();
+
+    for (int pos = 0; pos < srcs.length; pos++) {
+      tableSorted = tableSorted &&
+        isTableSorted(smbJoinContext,
+                      pGraphContext,
+                      smbJoinContext.getKeyExprMap().get((byte)pos),
+                      joinCtx,
+                      srcs,
+                      pos,
+                      sortColumnsFirstTable);
+    }
+
+    smbJoinContext.setSrcs(srcs);
+    return true;
+  }
+
+  // Can the join operator be converted to a sort-merge join operator ?
+  protected boolean canConvertJoinToSMBJoin(
+    JoinOperator joinOperator,
+    SortBucketJoinProcCtx smbJoinContext,
+    ParseContext pGraphContext) throws SemanticException {
+    boolean canConvert =
+      canConvertJoinToBucketMapJoin(
+        joinOperator,
+        pGraphContext,
+        smbJoinContext
+      );
+
+    if (!canConvert) {
+      return false;
+    }
+
+    return checkConvertJoinToSMBJoin(joinOperator, smbJoinContext, pGraphContext);
+  }
+
+  // Can the join operator be converted to a bucket map-merge join operator ?
+  protected boolean canConvertJoinToBucketMapJoin(
+    JoinOperator joinOp,
+    ParseContext pGraphContext,
+    SortBucketJoinProcCtx context) throws SemanticException {
+
+    // This has already been inspected and rejected
+    if (context.getRejectedJoinOps().contains(joinOp)) {
+      return false;
+    }
+
+    QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOp);
+    if (joinCtx == null) {
+      return false;
+    }
+
+    Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
+    try {
+      bigTableMatcherClass =
+        (Class<? extends BigTableSelectorForAutoSMJ>)
+          (Class.forName(HiveConf.getVar(pGraphContext.getConf(),
+            HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR)));
+    } catch (ClassNotFoundException e) {
+      throw new SemanticException(e.getMessage());
+    }
+
+    BigTableSelectorForAutoSMJ bigTableMatcher =
+      (BigTableSelectorForAutoSMJ) ReflectionUtils.newInstance(bigTableMatcherClass, null);
+    int bigTablePosition =
+      bigTableMatcher.getBigTablePosition(pGraphContext, joinOp);
+    context.setBigTablePosition(bigTablePosition);
+    String joinAlias =
+      bigTablePosition == 0 ?
+        joinCtx.getLeftAlias() : joinCtx.getRightAliases()[bigTablePosition - 1];
+    joinAlias = QB.getAppendedAliasFromId(joinCtx.getId(), joinAlias);
+
+    Map<Byte, List<ExprNodeDesc>> keyExprMap  = new HashMap<Byte, List<ExprNodeDesc>>();
+    List<Operator<? extends OperatorDesc>> parentOps = joinOp.getParentOperators();
+    // get the join keys from parent ReduceSink operators
+    for (Operator<? extends OperatorDesc> parentOp : parentOps) {
+      ReduceSinkDesc rsconf = ((ReduceSinkOperator)parentOp).getConf();
+      Byte tag = (byte) rsconf.getTag();
+      List<ExprNodeDesc> keys = rsconf.getKeyCols();
+      keyExprMap.put(tag, keys);
+    }
+
+    context.setKeyExprMap(keyExprMap);
+    String[] srcs = joinCtx.getBaseSrc();
+    for (int srcPos = 0; srcPos < srcs.length; srcPos++) {
+      srcs[srcPos] = QB.getAppendedAliasFromId(joinCtx.getId(), srcs[srcPos]);
+    }
+
+    // Given a candidate map-join, can this join be converted.
+    // The candidate map-join was derived from the pluggable sort merge join big
+    // table matcher.
+    return checkConvertBucketMapJoin(
+      pGraphContext,
+      context,
+      joinCtx,
+      keyExprMap,
+      joinAlias,
+      Arrays.asList(srcs));
+  }
+
+  // Convert the join operator to a bucket map-join join operator
+  protected MapJoinOperator convertJoinToBucketMapJoin(
+    JoinOperator joinOp,
+    SortBucketJoinProcCtx joinContext,
+    ParseContext parseContext) throws SemanticException {
+    MapJoinOperator mapJoinOp = MapJoinProcessor.convertMapJoin(
+      parseContext.getOpParseCtx(),
+      joinOp,
+      pGraphContext.getJoinContext().get(joinOp),
+      joinContext.getBigTablePosition(),
+      false,
+      false);
+    // Remove the join operator from the query join context
+    parseContext.getMapJoinContext().put(mapJoinOp, parseContext.getJoinContext().get(joinOp));
+    parseContext.getJoinContext().remove(joinOp);
+    convertMapJoinToBucketMapJoin(mapJoinOp, joinContext);
+    return mapJoinOp;
+  }
+
+  // Convert the join operator to a sort-merge join operator
+  protected void convertJoinToSMBJoin(
+    JoinOperator joinOp,
+    SortBucketJoinProcCtx smbJoinContext,
+    ParseContext parseContext) throws SemanticException {
+    MapJoinOperator mapJoinOp = convertJoinToBucketMapJoin(joinOp, smbJoinContext, parseContext);
+    SMBMapJoinOperator smbMapJoinOp =
+        convertBucketMapJoinToSMBJoin(mapJoinOp, smbJoinContext, parseContext);
+    smbMapJoinOp.setConvertedAutomaticallySMBJoin(true);
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java?rev=1447593&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java Tue Feb 19 05:17:52 2013
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/*
+ * This is a pluggable policy to choose the candidate map-join table for converting a join to a
+ * sort merge join. The largest table is chosen based on the size of the tables.
+ */
+public class AvgPartitionSizeBasedBigTableSelectorForAutoSMJ
+    extends SizeBasedBigTableSelectorForAutoSMJ
+    implements BigTableSelectorForAutoSMJ {
+
+  private static final Log LOG = LogFactory
+      .getLog(AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.class.getName());
+
+  public int getBigTablePosition(ParseContext parseCtx, JoinOperator joinOp)
+    throws SemanticException {
+    int bigTablePos = 0;
+    long maxSize = 0;
+    int numPartitionsCurrentBigTable = 0; // number of partitions for the chosen big table
+    HiveConf conf = parseCtx.getConf();
+
+    try {
+      List<TableScanOperator> topOps = new ArrayList<TableScanOperator>();
+      getListTopOps(joinOp, topOps);
+      int currentPos = 0;
+      for (TableScanOperator topOp : topOps) {
+        int numPartitions = 1; // in case the sizes match, preference is
+                               // given to the table with fewer partitions
+        Table table = parseCtx.getTopToTable().get(topOp);
+        long averageSize = 0;
+
+        if (!table.isPartitioned()) {
+          averageSize = getSize(conf, table);
+        }
+        else {
+          // For partitioned tables, get the size of all the partitions
+          PrunedPartitionList partsList =
+            PartitionPruner.prune(parseCtx.getTopToTable().get(topOp),
+              parseCtx.getOpToPartPruner().get(topOp), parseCtx.getConf(),
+              null, parseCtx.getPrunedPartitions());
+          numPartitions = partsList.getNotDeniedPartns().size();
+          long totalSize = 0;
+          for (Partition part : partsList.getNotDeniedPartns()) {
+            totalSize += getSize(conf, part);
+          }
+          averageSize = totalSize/numPartitions;
+        }
+
+        if (averageSize > maxSize) {
+          maxSize = averageSize;
+          bigTablePos = currentPos;
+          numPartitionsCurrentBigTable = numPartitions;
+        }
+        // If the sizes match, prefer the table with fewer partitions
+        else if (averageSize == maxSize) {
+          if (numPartitions < numPartitionsCurrentBigTable) {
+            bigTablePos = currentPos;
+            numPartitionsCurrentBigTable = numPartitions;
+          }
+        }
+
+        currentPos++;
+      }
+    } catch (HiveException e) {
+      throw new SemanticException(e.getMessage());
+    }
+
+    return bigTablePos;
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BigTableSelectorForAutoSMJ.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BigTableSelectorForAutoSMJ.java?rev=1447593&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BigTableSelectorForAutoSMJ.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BigTableSelectorForAutoSMJ.java Tue Feb 19 05:17:52 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/*
+ * This is a pluggable policy to chose the candidate map-join table for converting a join to a
+ * sort merge join. The policy can decide the big table position. Some of the existing polocies
+ * decide the big table based on size or position of the tables.
+ */
+public interface BigTableSelectorForAutoSMJ {
+  public int getBigTablePosition(ParseContext parseContext, JoinOperator joinOp)
+    throws SemanticException;
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketJoinProcCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketJoinProcCtx.java?rev=1447593&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketJoinProcCtx.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketJoinProcCtx.java Tue Feb 19 05:17:52 2013
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+
+public class BucketJoinProcCtx implements NodeProcessorCtx {
+  private static final Log LOG =
+    LogFactory.getLog(BucketJoinProcCtx.class.getName());
+
+  private final HiveConf conf;
+
+  private Set<JoinOperator> rejectedJoinOps = new HashSet<JoinOperator>();
+
+  // The set of join operators which can be converted to a bucketed map join
+  private Set<JoinOperator> convertedJoinOps = new HashSet<JoinOperator>();
+
+  private Map<String, List<Integer>> aliasToPartitionBucketNumberMapping;
+  private Map<String, List<List<String>>> aliasToPartitionBucketFileNamesMapping;
+  private Map<Partition, List<String>> bigTblPartsToBucketFileNames;
+  private Map<Partition, Integer> bigTblPartsToBucketNumber;
+  private List<String> joinAliases;
+  private String baseBigAlias;
+  private boolean bigTablePartitioned;
+
+  public BucketJoinProcCtx(HiveConf conf) {
+    this.conf = conf;
+  }
+
+  public HiveConf getConf() {
+    return conf;
+  }
+
+  public Set<JoinOperator> getRejectedJoinOps() {
+    return rejectedJoinOps;
+  }
+
+  public Set<JoinOperator> getConvertedJoinOps() {
+    return convertedJoinOps;
+  }
+
+  public void setRejectedJoinOps(Set<JoinOperator> rejectedJoinOps) {
+    this.rejectedJoinOps = rejectedJoinOps;
+  }
+
+  public void setConvertedJoinOps(Set<JoinOperator> setOfConvertedJoins) {
+    this.convertedJoinOps = setOfConvertedJoins;
+  }
+
+  public Map<String, List<Integer>> getAliasToPartitionBucketNumberMapping() {
+    return aliasToPartitionBucketNumberMapping;
+  }
+
+  public Map<String, List<List<String>>> getAliasToPartitionBucketFileNamesMapping() {
+    return aliasToPartitionBucketFileNamesMapping;
+  }
+
+  public Map<Partition, List<String>> getBigTblPartsToBucketFileNames() {
+    return bigTblPartsToBucketFileNames;
+  }
+
+  public Map<Partition, Integer> getBigTblPartsToBucketNumber() {
+    return bigTblPartsToBucketNumber;
+  }
+
+  public void setAliasToPartitionBucketNumberMapping(
+    Map<String, List<Integer>> aliasToPartitionBucketNumberMapping) {
+    this.aliasToPartitionBucketNumberMapping = aliasToPartitionBucketNumberMapping;
+  }
+
+  public void setAliasToPartitionBucketFileNamesMapping(
+    Map<String, List<List<String>>> aliasToPartitionBucketFileNamesMapping) {
+    this.aliasToPartitionBucketFileNamesMapping = aliasToPartitionBucketFileNamesMapping;
+  }
+
+  public void setBigTblPartsToBucketFileNames(
+    Map<Partition, List<String>> bigTblPartsToBucketFileNames) {
+    this.bigTblPartsToBucketFileNames = bigTblPartsToBucketFileNames;
+  }
+
+  public void setBigTblPartsToBucketNumber(Map<Partition, Integer> bigTblPartsToBucketNumber) {
+    this.bigTblPartsToBucketNumber = bigTblPartsToBucketNumber;
+  }
+
+  public void setJoinAliases(List<String> joinAliases) {
+    this.joinAliases = joinAliases;
+  }
+
+  public void setBaseBigAlias(String baseBigAlias) {
+    this.baseBigAlias = baseBigAlias;
+  }
+
+  public List<String> getJoinAliases() {
+    return joinAliases;
+  }
+
+  public String getBaseBigAlias() {
+    return baseBigAlias;
+  }
+
+  public boolean isBigTablePartitioned() {
+    return bigTablePartitioned;
+  }
+
+  public void setBigTablePartitioned(boolean bigTablePartitioned) {
+    this.bigTablePartitioned = bigTablePartitioned;
+  }
+}



Mime
View raw message