hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1439946 [1/9] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/h...
Date Tue, 29 Jan 2013 15:33:55 GMT
Author: hashutosh
Date: Tue Jan 29 15:33:53 2013
New Revision: 1439946

URL: http://svn.apache.org/viewvc?rev=1439946&view=rev
Log:
HIVE-de-emphasize mapjoin hint (Namit Jain via Ashutosh Chauhan)

Added:
    hive/trunk/ql/src/test/queries/clientnegative/join28.q
    hive/trunk/ql/src/test/queries/clientnegative/join29.q
    hive/trunk/ql/src/test/queries/clientnegative/join32.q
    hive/trunk/ql/src/test/queries/clientnegative/join35.q
    hive/trunk/ql/src/test/queries/clientnegative/smb_mapjoin_14.q
    hive/trunk/ql/src/test/queries/clientnegative/union22.q
    hive/trunk/ql/src/test/queries/clientpositive/multiMapJoin1.q
    hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_16.q
    hive/trunk/ql/src/test/results/clientnegative/join28.q.out
    hive/trunk/ql/src/test/results/clientnegative/join29.q.out
    hive/trunk/ql/src/test/results/clientnegative/join32.q.out
    hive/trunk/ql/src/test/results/clientnegative/join35.q.out
    hive/trunk/ql/src/test/results/clientnegative/smb_mapjoin_14.q.out
    hive/trunk/ql/src/test/results/clientnegative/union22.q.out
    hive/trunk/ql/src/test/results/clientpositive/multiMapJoin1.q.out
    hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_16.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverCommonJoin.java
    hive/trunk/ql/src/test/queries/clientpositive/join28.q
    hive/trunk/ql/src/test/queries/clientpositive/join29.q
    hive/trunk/ql/src/test/queries/clientpositive/join31.q
    hive/trunk/ql/src/test/queries/clientpositive/join32.q
    hive/trunk/ql/src/test/queries/clientpositive/join33.q
    hive/trunk/ql/src/test/queries/clientpositive/join34.q
    hive/trunk/ql/src/test/queries/clientpositive/join35.q
    hive/trunk/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q
    hive/trunk/ql/src/test/queries/clientpositive/mapjoin_subquery.q
    hive/trunk/ql/src/test/queries/clientpositive/mapjoin_subquery2.q
    hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_14.q
    hive/trunk/ql/src/test/queries/clientpositive/union22.q
    hive/trunk/ql/src/test/results/clientpositive/bucket_map_join_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucket_map_join_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_3.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_4.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_5.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_6.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_7.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketcontext_8.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin10.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin11.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin12.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin13.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin8.q.out
    hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin9.q.out
    hive/trunk/ql/src/test/results/clientpositive/join28.q.out
    hive/trunk/ql/src/test/results/clientpositive/join29.q.out
    hive/trunk/ql/src/test/results/clientpositive/join30.q.out
    hive/trunk/ql/src/test/results/clientpositive/join31.q.out
    hive/trunk/ql/src/test/results/clientpositive/join32.q.out
    hive/trunk/ql/src/test/results/clientpositive/join33.q.out
    hive/trunk/ql/src/test/results/clientpositive/join34.q.out
    hive/trunk/ql/src/test/results/clientpositive/join35.q.out
    hive/trunk/ql/src/test/results/clientpositive/join38.q.out
    hive/trunk/ql/src/test/results/clientpositive/mapjoin_distinct.q.out
    hive/trunk/ql/src/test/results/clientpositive/mapjoin_filter_on_outerjoin.q.out
    hive/trunk/ql/src/test/results/clientpositive/mapjoin_mapjoin.q.out
    hive/trunk/ql/src/test/results/clientpositive/mapjoin_subquery.q.out
    hive/trunk/ql/src/test/results/clientpositive/mapjoin_subquery2.q.out
    hive/trunk/ql/src/test/results/clientpositive/semijoin.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoin.q.out
    hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_13.q.out
    hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_14.q.out
    hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_15.q.out
    hive/trunk/ql/src/test/results/clientpositive/sort_merge_join_desc_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/sort_merge_join_desc_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/sort_merge_join_desc_3.q.out
    hive/trunk/ql/src/test/results/clientpositive/sort_merge_join_desc_4.q.out
    hive/trunk/ql/src/test/results/clientpositive/sort_merge_join_desc_5.q.out
    hive/trunk/ql/src/test/results/clientpositive/sort_merge_join_desc_6.q.out
    hive/trunk/ql/src/test/results/clientpositive/sort_merge_join_desc_7.q.out
    hive/trunk/ql/src/test/results/clientpositive/union22.q.out

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Jan 29 15:33:53 2013
@@ -37,8 +37,6 @@ import javax.security.auth.login.LoginEx
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.common.LogUtils;
-import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.JobConf;
@@ -472,6 +470,9 @@ public class HiveConf extends Configurat
 
     HIVESKEWJOIN("hive.optimize.skewjoin", false),
     HIVECONVERTJOIN("hive.auto.convert.join", false),
+    HIVECONVERTJOINNOCONDITIONALTASK("hive.auto.convert.join.noconditionaltask", false),
+    HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD("hive.auto.convert.join.noconditionaltask.size",
+        10000000L),
     HIVESKEWJOINKEY("hive.skewjoin.key", 100000),
     HIVESKEWJOINMAPJOINNUMMAPTASK("hive.skewjoin.mapjoin.map.tasks", 10000),
     HIVESKEWJOINMAPJOINMINSPLIT("hive.skewjoin.mapjoin.min.split", 33554432L), //32M

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Tue Jan 29 15:33:53 2013
@@ -814,6 +814,23 @@
   <description>Whether Hive enable the optimization about converting common join into mapjoin based on the input file size</description>
 </property>
 
+<property>
+  <name>hive.auto.convert.join.noconditionaltask</name>
+  <value>false</value>
+  <description>Whether Hive enable the optimization about converting common join into mapjoin based on the input file 
+    size. If this paramater is on, and the sum of size for n-1 of the tables/partitions for a n-way join is smaller than the
+    specified size, the join is directly converted to a mapjoin (there is no conditional task).
+  </description>
+</property>
+
+<property>
+  <name>hive.auto.convert.join.noconditionaltask.size</name>
+  <value>10000000</value>
+  <description>If hive.auto.convert.join.noconditionaltask is off, this parameter does not take affect. However, if it
+    is on, and the sum of size for n-1 of the tables/partitions for a n-way join is smaller than this size, the join is directly
+    converted to a mapjoin(there is no conditional task). The default is 10MB
+  </description>
+</property>
 
 <property>
   <name>hive.script.auto.progress</name>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Tue Jan 29 15:33:53 2013
@@ -324,6 +324,9 @@ public enum ErrorMsg {
     "(higher than the number of rows per input row due to grouping sets in the query), or " +
     "rewrite the query to not use distincts."),
 
+  OPERATOR_NOT_ALLOWED_WITH_MAPJOIN(10227,
+    "Not all clauses are supported with mapjoin hint. Please remove mapjoin hint."),
+
   SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
   SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
       + "It may have crashed with an error."),

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Tue Jan 29 15:33:53 2013
@@ -934,4 +934,13 @@ transient boolean newGroupStarted = fals
     this.posToAliasMap = posToAliasMap;
   }
 
+  @Override
+  public boolean opAllowedBeforeMapJoin() {
+    return false;
+  }
+
+  @Override
+  public boolean opAllowedAfterMapJoin() {
+    return false;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Jan 29 15:33:53 2013
@@ -1478,4 +1478,22 @@ public abstract class Operator<T extends
   public boolean supportUnionRemoveOptimization() {
     return false;
   }
+
+  /*
+   * This operator is allowed before mapjoin. Eventually, mapjoin hint should be done away with.
+   * But, since bucketized mapjoin and sortmerge join depend on it completely. it is needed.
+   * Check the operators which are allowed before mapjoin.
+   */
+  public boolean opAllowedBeforeMapJoin() {
+    return true;
+  }
+
+  /*
+   * This operator is allowed after mapjoin. Eventually, mapjoin hint should be done away with.
+   * But, since bucketized mapjoin and sortmerge join depend on it completely. it is needed.
+   * Check the operators which are allowed after mapjoin.
+   */
+  public boolean opAllowedAfterMapJoin() {
+    return true;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Jan 29 15:33:53 2013
@@ -321,4 +321,9 @@ public class ReduceSinkOperator extends 
   public OperatorType getType() {
     return OperatorType.REDUCESINK;
   }
+
+  @Override
+  public boolean opAllowedBeforeMapJoin() {
+    return false;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Tue Jan 29 15:33:53 2013
@@ -75,8 +75,11 @@ public abstract class Task<T extends Ser
   public static final int CONVERTED_MAPJOIN = 2;
   public static final int CONVERTED_LOCAL_MAPJOIN = 3;
   public static final int BACKUP_COMMON_JOIN = 4;
-  public static final int LOCAL_MAPJOIN=5;
-
+  public static final int LOCAL_MAPJOIN = 5;
+  // The join task is converted to a mapjoin task. This can only happen if
+  // hive.auto.convert.join.noconditionaltask is set to true. No conditional task was
+  // created in case the mapjoin failed.
+  public static final int MAPJOIN_ONLY_NOBACKUP = 6;
 
   // Descendants tasks who subscribe feeds from this task
   protected transient List<Task<? extends Serializable>> feedSubscribers;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Tue Jan 29 15:33:53 2013
@@ -148,4 +148,19 @@ public class UnionOperator extends Opera
   public OperatorType getType() {
     return OperatorType.UNION;
   }
+
+  /**
+   * Union operators are not allowed either before or after a explicit mapjoin hint.
+   * Note that, the same query would just work without the mapjoin hint (by setting
+   * hive.auto.convert.join to true).
+   **/
+  @Override
+  public boolean opAllowedBeforeMapJoin() {
+    return false;
+  }
+
+  @Override
+  public boolean opAllowedAfterMapJoin() {
+    return false;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Tue Jan 29 15:33:53 2013
@@ -32,12 +32,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.MoveTask;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -52,7 +50,6 @@ import org.apache.hadoop.hive.ql.io.rcfi
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
@@ -67,7 +64,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ExtractDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
-import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -166,10 +162,10 @@ public class GenMRFileSink1 implements N
       }
     }
 
-    String finalName = processFS(nd, stack, opProcCtx, chDir);
+    String finalName = processFS(fsOp, stack, opProcCtx, chDir);
 
     // need to merge the files in the destination table/partitions
-    if (chDir && (finalName != null)) {
+    if (chDir) {
       createMergeJob((FileSinkOperator) nd, ctx, finalName);
     }
 
@@ -760,7 +756,7 @@ public class GenMRFileSink1 implements N
   /**
    * Process the FileSink operator to generate a MoveTask if necessary.
    *
-   * @param nd
+   * @param fsOp
    *          current FileSink operator
    * @param stack
    *          parent operators
@@ -771,16 +767,9 @@ public class GenMRFileSink1 implements N
    * @return the final file name to which the FileSinkOperator should store.
    * @throws SemanticException
    */
-  private String processFS(Node nd, Stack<Node> stack,
+  private String processFS(FileSinkOperator fsOp, Stack<Node> stack,
       NodeProcessorCtx opProcCtx, boolean chDir) throws SemanticException {
 
-    // Is it the dummy file sink after the mapjoin
-    FileSinkOperator fsOp = (FileSinkOperator) nd;
-    if ((fsOp.getParentOperators().size() == 1)
-        && (fsOp.getParentOperators().get(0) instanceof MapJoinOperator)) {
-      return null;
-    }
-
     GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
     List<FileSinkOperator> seenFSOps = ctx.getSeenFileSinkOps();
     if (seenFSOps == null) {
@@ -884,24 +873,6 @@ public class GenMRFileSink1 implements N
       return dest;
     }
 
-    AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp = ctx.getCurrMapJoinOp();
-
-    if (currMapJoinOp != null) {
-      opTaskMap.put(null, currTask);
-      GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(currMapJoinOp);
-      MapredWork plan = (MapredWork) currTask.getWork();
-
-      String taskTmpDir = mjCtx.getTaskTmpDir();
-      TableDesc tt_desc = mjCtx.getTTDesc();
-      assert plan.getPathToAliases().get(taskTmpDir) == null;
-      plan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
-      plan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
-      plan.getPathToPartitionInfo().put(taskTmpDir,
-          new PartitionDesc(tt_desc, null));
-      plan.getAliasToWork().put(taskTmpDir, mjCtx.getRootMapJoinOp());
-      return dest;
-    }
-
     return dest;
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java Tue Jan 29 15:33:53 2013
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -40,7 +39,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -155,90 +153,10 @@ public class GenMRProcContext implements
     }
   }
 
-  /**
-   * GenMRMapJoinCtx.
-   *
-   */
-  public static class GenMRMapJoinCtx {
-    String taskTmpDir;
-    TableDesc tt_desc;
-    Operator<? extends OperatorDesc> rootMapJoinOp;
-    AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin;
-
-    public GenMRMapJoinCtx() {
-      taskTmpDir = null;
-      tt_desc = null;
-      rootMapJoinOp = null;
-      oldMapJoin = null;
-    }
-
-    /**
-     * @param taskTmpDir
-     * @param tt_desc
-     * @param rootMapJoinOp
-     * @param oldMapJoin
-     */
-    public GenMRMapJoinCtx(String taskTmpDir, TableDesc tt_desc,
-        Operator<? extends OperatorDesc> rootMapJoinOp,
-        AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin) {
-      this.taskTmpDir = taskTmpDir;
-      this.tt_desc = tt_desc;
-      this.rootMapJoinOp = rootMapJoinOp;
-      this.oldMapJoin = oldMapJoin;
-    }
-
-    public void setTaskTmpDir(String taskTmpDir) {
-      this.taskTmpDir = taskTmpDir;
-    }
-
-    public String getTaskTmpDir() {
-      return taskTmpDir;
-    }
-
-    public void setTTDesc(TableDesc tt_desc) {
-      this.tt_desc = tt_desc;
-    }
-
-    public TableDesc getTTDesc() {
-      return tt_desc;
-    }
-
-    /**
-     * @return the childSelect
-     */
-    public Operator<? extends OperatorDesc> getRootMapJoinOp() {
-      return rootMapJoinOp;
-    }
-
-    /**
-     * @param rootMapJoinOp
-     *          the rootMapJoinOp to set
-     */
-    public void setRootMapJoinOp(Operator<? extends OperatorDesc> rootMapJoinOp) {
-      this.rootMapJoinOp = rootMapJoinOp;
-    }
-
-    /**
-     * @return the oldMapJoin
-     */
-    public AbstractMapJoinOperator<? extends MapJoinDesc> getOldMapJoin() {
-      return oldMapJoin;
-    }
-
-    /**
-     * @param oldMapJoin
-     *          the oldMapJoin to set
-     */
-    public void setOldMapJoin(AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin) {
-      this.oldMapJoin = oldMapJoin;
-    }
-  }
-
   private HiveConf conf;
   private
     HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap;
   private HashMap<UnionOperator, GenMRUnionCtx> unionTaskMap;
-  private HashMap<AbstractMapJoinOperator<? extends MapJoinDesc>, GenMRMapJoinCtx> mapJoinTaskMap;
   private List<Operator<? extends OperatorDesc>> seenOps;
   private List<FileSinkOperator> seenFileSinkOps;
 
@@ -250,7 +168,6 @@ public class GenMRProcContext implements
   private Task<? extends Serializable> currTask;
   private Operator<? extends OperatorDesc> currTopOp;
   private UnionOperator currUnionOp;
-  private AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp;
   private String currAliasId;
   private List<Operator<? extends OperatorDesc>> rootOps;
   private DependencyCollectionTask dependencyTaskForMultiInsert;
@@ -313,12 +230,10 @@ public class GenMRProcContext implements
     currTask = null;
     currTopOp = null;
     currUnionOp = null;
-    currMapJoinOp = null;
     currAliasId = null;
     rootOps = new ArrayList<Operator<? extends OperatorDesc>>();
     rootOps.addAll(parseCtx.getTopOps().values());
     unionTaskMap = new HashMap<UnionOperator, GenMRUnionCtx>();
-    mapJoinTaskMap = new HashMap<AbstractMapJoinOperator<? extends MapJoinDesc>, GenMRMapJoinCtx>();
     dependencyTaskForMultiInsert = null;
     linkedFileDescTasks = null;
   }
@@ -488,18 +403,6 @@ public class GenMRProcContext implements
     this.currUnionOp = currUnionOp;
   }
 
-  public AbstractMapJoinOperator<? extends MapJoinDesc> getCurrMapJoinOp() {
-    return currMapJoinOp;
-  }
-
-  /**
-   * @param currMapJoinOp
-   *          current map join operator
-   */
-  public void setCurrMapJoinOp(AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp) {
-    this.currMapJoinOp = currMapJoinOp;
-  }
-
   /**
    * @return current top alias
    */
@@ -523,14 +426,6 @@ public class GenMRProcContext implements
     unionTaskMap.put(op, uTask);
   }
 
-  public GenMRMapJoinCtx getMapJoinCtx(AbstractMapJoinOperator<? extends MapJoinDesc> op) {
-    return mapJoinTaskMap.get(op);
-  }
-
-  public void setMapJoinCtx(AbstractMapJoinOperator<? extends MapJoinDesc> op, GenMRMapJoinCtx mjCtx) {
-    mapJoinTaskMap.put(op, mjCtx);
-  }
-
   /**
    * Get the input set.
    */

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java Tue Jan 29 15:33:53 2013
@@ -43,7 +43,11 @@ public class GenMRRedSink1 implements No
   }
 
   /**
-   * Reduce Scan encountered.
+   * Reduce Sink encountered.
+   * a) If we are seeing this RS for first time, we initialize plan corresponding to this RS.
+   * b) If we are seeing this RS for second or later time then either query had a join in which
+   *    case we will merge this plan with earlier plan involving this RS or plan for this RS
+   *    needs to be split in two branches.
    *
    * @param nd
    *          the reduce sink operator encountered
@@ -81,7 +85,7 @@ public class GenMRRedSink1 implements No
     } else {
       // This will happen in case of joins. The current plan can be thrown away
       // after being merged with the original plan
-      GenMapRedUtils.joinPlan(op, null, opMapTask, ctx, -1, false, false, null);
+      GenMapRedUtils.joinPlan(op, null, opMapTask, ctx, -1, false);
       currTask = opMapTask;
       ctx.setCurrTask(currTask);
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java Tue Jan 29 15:33:53 2013
@@ -71,7 +71,7 @@ public class GenMRRedSink2 implements No
     if (opMapTask == null) {
       GenMapRedUtils.splitPlan(op, ctx);
     } else {
-      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true, false, null);
+      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true);
       currTask = opMapTask;
       ctx.setCurrTask(currTask);
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink4.java Tue Jan 29 15:33:53 2013
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.optimizer;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Stack;
-
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-
-/**
- * Processor for the rule - map join followed by reduce sink.
- */
-public class GenMRRedSink4 implements NodeProcessor {
-
-  public GenMRRedSink4() {
-  }
-
-  /**
-   * Reduce Scan encountered.
-   *
-   * @param nd
-   *          the reduce sink operator encountered
-   * @param opProcCtx
-   *          context
-   */
-  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
-      Object... nodeOutputs) throws SemanticException {
-    ReduceSinkOperator op = (ReduceSinkOperator) nd;
-    GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
-
-    ctx.getParseCtx();
-
-    // map-join consisted on a bunch of map-only jobs, and it has been split
-    // after the mapjoin
-    Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
-    Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
-        .getMapCurrCtx();
-    GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
-    Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
-    MapredWork plan = (MapredWork) currTask.getWork();
-    HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap = ctx
-        .getOpTaskMap();
-    Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
-
-    ctx.setCurrTask(currTask);
-
-    // If the plan for this reducer does not exist, initialize the plan
-    if (opMapTask == null) {
-      // When the reducer is encountered for the first time
-      if (plan.getReducer() == null) {
-        GenMapRedUtils.initMapJoinPlan(op, ctx, true, null, true, -1);
-        // When mapjoin is followed by a multi-table insert
-      } else {
-        GenMapRedUtils.splitPlan(op, ctx);
-      }
-    } else {
-      // There is a join after mapjoin. One of the branches of mapjoin has already
-      // been initialized.
-      // Initialize the current branch, and join with the original plan.
-      assert plan.getReducer() != reducer;
-      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, false, true, null);
-    }
-
-    mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
-        ctx.getCurrAliasId()));
-
-    // the mapjoin operator has been processed
-    ctx.setCurrMapJoinOp(null);
-    return null;
-  }
-}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Tue Jan 29 15:33:53 2013
@@ -26,7 +26,6 @@ import java.util.Stack;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -35,7 +34,6 @@ import org.apache.hadoop.hive.ql.exec.Un
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
@@ -44,10 +42,8 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -84,16 +80,10 @@ public class GenMRUnion1 implements Node
     }
 
     UnionParseContext uPrsCtx = uCtx.getUnionParseContext(union);
-    if ((uPrsCtx != null) && (uPrsCtx.getMapJoinQuery())) {
-      GenMapRedUtils.mergeMapJoinUnion(union, ctx,
-          UnionProcFactory.getPositionParent(union, stack));
-    }
-    else {
-      ctx.getMapCurrCtx().put(
-          (Operator<? extends OperatorDesc>) union,
-          new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
-              ctx.getCurrAliasId()));
-    }
+    ctx.getMapCurrCtx().put(
+        (Operator<? extends OperatorDesc>) union,
+        new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(),
+            ctx.getCurrAliasId()));
 
     // if the union is the first time seen, set current task to GenMRUnionCtx
     uCtxTask = ctx.getUnionTask(union);
@@ -103,7 +93,7 @@ public class GenMRUnion1 implements Node
       ctx.setUnionTask(union, uCtxTask);
     }
 
-    Task<? extends Serializable> uTask=ctx.getCurrTask();
+    Task<? extends Serializable> uTask = ctx.getCurrTask();
     if (uTask.getParentTasks() == null
         || uTask.getParentTasks().isEmpty()) {
       if (!ctx.getRootTasks().contains(uTask)) {
@@ -134,8 +124,9 @@ public class GenMRUnion1 implements Node
       GenMRUnionCtx uCtxTask) {
     ParseContext parseCtx = ctx.getParseCtx();
 
-    TableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils.getFieldSchemasFromRowSchema(
-        parent.getSchema(), "temporarycol"));
+    TableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
+        .getFieldSchemasFromRowSchema(
+            parent.getSchema(), "temporarycol"));
 
     // generate the temporary file
     Context baseCtx = parseCtx.getContext();
@@ -150,7 +141,7 @@ public class GenMRUnion1 implements Node
     parent.getChildOperators().set(0, fs_op);
 
     List<Operator<? extends OperatorDesc>> parentOpList =
-      new ArrayList<Operator<? extends OperatorDesc>>();
+        new ArrayList<Operator<? extends OperatorDesc>>();
     parentOpList.add(parent);
     fs_op.setParentOperators(parentOpList);
 
@@ -158,7 +149,7 @@ public class GenMRUnion1 implements Node
     Operator<? extends OperatorDesc> ts_op = OperatorFactory.get(
         new TableScanDesc(), parent.getSchema());
     List<Operator<? extends OperatorDesc>> childOpList =
-      new ArrayList<Operator<? extends OperatorDesc>>();
+        new ArrayList<Operator<? extends OperatorDesc>>();
     childOpList.add(child);
     ts_op.setChildOperators(childOpList);
     child.replaceParent(parent, ts_op);
@@ -212,27 +203,9 @@ public class GenMRUnion1 implements Node
     }
   }
 
-  private void processSubQueryUnionMapJoin(GenMRProcContext ctx) {
-    AbstractMapJoinOperator<? extends MapJoinDesc> mjOp = ctx.getCurrMapJoinOp();
-    assert mjOp != null;
-    GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mjOp);
-    assert mjCtx != null;
-    MapredWork plan = (MapredWork) ctx.getCurrTask().getWork();
-
-    String taskTmpDir = mjCtx.getTaskTmpDir();
-    TableDesc tt_desc = mjCtx.getTTDesc();
-    assert plan.getPathToAliases().get(taskTmpDir) == null;
-    plan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
-    plan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
-    plan.getPathToPartitionInfo().put(taskTmpDir,
-        new PartitionDesc(tt_desc, null));
-    plan.getAliasToWork().put(taskTmpDir, mjCtx.getRootMapJoinOp());
-  }
-
   /**
    * Union Operator encountered . Currently, the algorithm is pretty simple: If
-   * all the sub-queries are map-only, don't do anything. However, if there is a
-   * mapjoin followed by the union, merge at the union Otherwise, insert a
+   * all the sub-queries are map-only, don't do anything. Otherwise, insert a
    * FileSink on top of all the sub-queries.
    *
    * This can be optimized later on.
@@ -284,8 +257,7 @@ public class GenMRUnion1 implements Node
     }
 
     // Copy into the current union task plan if
-    if (uPrsCtx.getMapOnlySubq(pos)
-        && !uPrsCtx.getMapJoinSubq(pos) && uPrsCtx.getRootTask(pos)) {
+    if (uPrsCtx.getMapOnlySubq(pos) && uPrsCtx.getRootTask(pos)) {
       processSubQueryUnionMerge(ctx, uCtxTask, union, stack);
     }
     // If it a map-reduce job, create a temporary file
@@ -295,13 +267,10 @@ public class GenMRUnion1 implements Node
           && (!ctx.getRootTasks().contains(currTask))) {
         ctx.getRootTasks().add(currTask);
       }
-      // If there is a mapjoin at position 'pos'
-      if (uPrsCtx.getMapJoinSubq(pos)) {
-        processSubQueryUnionMapJoin(ctx);
-      }
 
-      processSubQueryUnionCreateIntermediate(union.getParentOperators().get(pos), union, uTask, ctx, uCtxTask);
-      //the currAliasId and CurrTopOp is not valid any more
+      processSubQueryUnionCreateIntermediate(union.getParentOperators().get(pos), union, uTask,
+          ctx, uCtxTask);
+      // the currAliasId and CurrTopOp is not valid any more
       ctx.setCurrAliasId(null);
       ctx.setCurrTopOp(null);
       ctx.getOpTaskMap().put(null, uTask);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Tue Jan 29 15:33:53 2013
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.optimi
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -33,12 +32,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -47,19 +44,15 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPruner;
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
-import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
-import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext.UnionParseContext;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
@@ -97,12 +90,12 @@ public final class GenMapRedUtils {
       throws SemanticException {
     Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
     Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx =
-      opProcCtx.getMapCurrCtx();
+        opProcCtx.getMapCurrCtx();
     GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(0));
     Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
     MapredWork plan = (MapredWork) currTask.getWork();
     HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
-      opProcCtx.getOpTaskMap();
+        opProcCtx.getOpTaskMap();
     Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
 
     opTaskMap.put(reducer, currTask);
@@ -114,7 +107,7 @@ public final class GenMapRedUtils {
     List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
 
     if (!rootTasks.contains(currTask)) {
-        rootTasks.add(currTask);
+      rootTasks.add(currTask);
     }
     if (reducer.getClass() == JoinOperator.class) {
       plan.setNeedsTagging(true);
@@ -137,165 +130,6 @@ public final class GenMapRedUtils {
     opProcCtx.setCurrAliasId(currAliasId);
   }
 
-  public static void initMapJoinPlan(
-    Operator<? extends OperatorDesc> op, GenMRProcContext ctx,
-    boolean readInputMapJoin, UnionOperator currUnionOp, boolean setReducer, int pos)
-    throws SemanticException {
-    initMapJoinPlan(op, ctx, readInputMapJoin, currUnionOp, setReducer, pos, false);
-  }
-
-  /**
-   * Initialize the current plan by adding it to root tasks.
-   *
-   * @param op
-   *          the map join operator encountered
-   * @param opProcCtx
-   *          processing context
-   * @param pos
-   *          position of the parent
-   */
-  public static void initMapJoinPlan(Operator<? extends OperatorDesc> op,
-      GenMRProcContext opProcCtx, boolean readInputMapJoin,
-      UnionOperator currUnionOp, boolean setReducer, int pos, boolean createLocalPlan)
-      throws SemanticException {
-    Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx =
-      opProcCtx.getMapCurrCtx();
-    assert (((pos == -1) && (readInputMapJoin)) || (pos != -1));
-    int parentPos = (pos == -1) ? 0 : pos;
-    GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(
-        parentPos));
-    Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
-    MapredWork plan = (MapredWork) currTask.getWork();
-    HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>>  opTaskMap =
-      opProcCtx.getOpTaskMap();
-    Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
-
-    // The mapjoin has already been encountered. Some context must be stored
-    // about that
-    if (readInputMapJoin) {
-      AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp = opProcCtx.getCurrMapJoinOp();
-      assert currMapJoinOp != null;
-      boolean local = ((pos == -1) || (pos == (currMapJoinOp.getConf()).getPosBigTable())) ?
-          false : true;
-
-      if (setReducer) {
-        Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
-        plan.setReducer(reducer);
-        opTaskMap.put(reducer, currTask);
-        if (reducer.getClass() == JoinOperator.class) {
-          plan.setNeedsTagging(true);
-        }
-        ReduceSinkDesc desc = (ReduceSinkDesc) op.getConf();
-        plan.setNumReduceTasks(desc.getNumReducers());
-      } else {
-        opTaskMap.put(op, currTask);
-      }
-
-      if (currUnionOp == null) {
-        GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(currMapJoinOp);
-        String taskTmpDir;
-        TableDesc tt_desc;
-        Operator<? extends OperatorDesc> rootOp;
-
-        if (mjCtx.getOldMapJoin() == null || setReducer) {
-          taskTmpDir = mjCtx.getTaskTmpDir();
-          tt_desc = mjCtx.getTTDesc();
-          rootOp = mjCtx.getRootMapJoinOp();
-        } else {
-          GenMRMapJoinCtx oldMjCtx = opProcCtx.getMapJoinCtx(mjCtx
-              .getOldMapJoin());
-          taskTmpDir = oldMjCtx.getTaskTmpDir();
-          tt_desc = oldMjCtx.getTTDesc();
-          rootOp = oldMjCtx.getRootMapJoinOp();
-        }
-
-        setTaskPlan(taskTmpDir, taskTmpDir, rootOp, plan, local, tt_desc);
-        setupBucketMapJoinInfo(plan, currMapJoinOp, createLocalPlan);
-      } else {
-        initUnionPlan(opProcCtx, currUnionOp, currTask, false);
-      }
-
-      opProcCtx.setCurrMapJoinOp(null);
-    } else {
-      MapJoinDesc desc = (MapJoinDesc) op.getConf();
-
-      // The map is overloaded to keep track of mapjoins also
-      opTaskMap.put(op, currTask);
-
-      List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
-      if (!rootTasks.contains(currTask)) {
-        rootTasks.add(currTask);
-      }
-
-      assert currTopOp != null;
-      List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
-      String currAliasId = opProcCtx.getCurrAliasId();
-
-      seenOps.add(currTopOp);
-      boolean local = (pos == desc.getPosBigTable()) ? false : true;
-      setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
-      setupBucketMapJoinInfo(plan, (AbstractMapJoinOperator<? extends MapJoinDesc>)op, createLocalPlan);
-    }
-
-    opProcCtx.setCurrTask(currTask);
-    opProcCtx.setCurrTopOp(null);
-    opProcCtx.setCurrAliasId(null);
-  }
-
-  private static void setupBucketMapJoinInfo(MapredWork plan,
-      AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp, boolean createLocalPlan) {
-    if (currMapJoinOp != null) {
-      Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
-        currMapJoinOp.getConf().getAliasBucketFileNameMapping();
-      if(aliasBucketFileNameMapping!= null) {
-        MapredLocalWork localPlan = plan.getMapLocalWork();
-        if(localPlan == null) {
-          if(currMapJoinOp instanceof SMBMapJoinOperator) {
-            localPlan = ((SMBMapJoinOperator)currMapJoinOp).getConf().getLocalWork();
-          }
-          if (localPlan == null && createLocalPlan) {
-            localPlan = new MapredLocalWork(
-                new LinkedHashMap<String, Operator<? extends OperatorDesc>>(),
-                new LinkedHashMap<String, FetchWork>());
-          }
-        } else {
-          //local plan is not null, we want to merge it into SMBMapJoinOperator's local work
-          if(currMapJoinOp instanceof SMBMapJoinOperator) {
-            MapredLocalWork smbLocalWork = ((SMBMapJoinOperator)currMapJoinOp).getConf().getLocalWork();
-            if(smbLocalWork != null) {
-              localPlan.getAliasToFetchWork().putAll(smbLocalWork.getAliasToFetchWork());
-              localPlan.getAliasToWork().putAll(smbLocalWork.getAliasToWork());
-            }
-          }
-        }
-
-        if(localPlan == null) {
-          return;
-        }
-
-        if(currMapJoinOp instanceof SMBMapJoinOperator) {
-          plan.setMapLocalWork(null);
-          ((SMBMapJoinOperator)currMapJoinOp).getConf().setLocalWork(localPlan);
-        } else {
-          plan.setMapLocalWork(localPlan);
-        }
-        BucketMapJoinContext bucketMJCxt = new BucketMapJoinContext();
-        localPlan.setBucketMapjoinContext(bucketMJCxt);
-        bucketMJCxt.setAliasBucketFileNameMapping(aliasBucketFileNameMapping);
-        bucketMJCxt.setBucketFileNameMapping(currMapJoinOp.getConf().getBigTableBucketNumMapping());
-        localPlan.setInputFileChangeSensitive(true);
-        bucketMJCxt.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias());
-        bucketMJCxt.setBucketMatcherClass(org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class);
-        bucketMJCxt.setBigTablePartSpecToFileMapping(
-          currMapJoinOp.getConf().getBigTablePartSpecToFileMapping());
-        // BucketizedHiveInputFormat should be used for either sort merge join or bucket map join
-        if ((currMapJoinOp instanceof SMBMapJoinOperator)
-            || (currMapJoinOp.getConf().isBucketMapJoin())) {
-          plan.setUseBucketizedHiveInputFormat(true);
-        }
-      }
-    }
-  }
 
   /**
    * Initialize the current union plan.
@@ -312,7 +146,7 @@ public final class GenMapRedUtils {
 
     MapredWork plan = (MapredWork) unionTask.getWork();
     HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
-      opProcCtx.getOpTaskMap();
+        opProcCtx.getOpTaskMap();
 
     opTaskMap.put(reducer, unionTask);
     plan.setReducer(reducer);
@@ -377,6 +211,7 @@ public final class GenMapRedUtils {
       Task<? extends Serializable> currTask, boolean local)
       throws SemanticException {
     MapredWork plan = (MapredWork) currTask.getWork();
+
     // In case of lateral views followed by a join, the same tree
     // can be traversed more than one
     if (currUnionOp != null) {
@@ -433,13 +268,6 @@ public final class GenMapRedUtils {
     opProcCtx.setCurrTask(existingTask);
   }
 
-  public static void joinPlan(Operator<? extends OperatorDesc> op,
-      Task<? extends Serializable> oldTask, Task<? extends Serializable> task,
-      GenMRProcContext opProcCtx, int pos, boolean split,
-      boolean readMapJoinData, UnionOperator currUnionOp) throws SemanticException {
-    joinPlan(op, oldTask, task, opProcCtx, pos, split, readMapJoinData, currUnionOp, false);
-  }
-
   /**
    * Merge the current task with the task for the current reducer.
    *
@@ -456,8 +284,7 @@ public final class GenMapRedUtils {
    */
   public static void joinPlan(Operator<? extends OperatorDesc> op,
       Task<? extends Serializable> oldTask, Task<? extends Serializable> task,
-      GenMRProcContext opProcCtx, int pos, boolean split,
-      boolean readMapJoinData, UnionOperator currUnionOp, boolean createLocalWork)
+      GenMRProcContext opProcCtx, int pos, boolean split)
       throws SemanticException {
     Task<? extends Serializable> currTask = task;
     MapredWork plan = (MapredWork) currTask.getWork();
@@ -493,53 +320,15 @@ public final class GenMapRedUtils {
               : true;
         }
         setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
-        if(op instanceof AbstractMapJoinOperator) {
-          setupBucketMapJoinInfo(plan, (AbstractMapJoinOperator<? extends MapJoinDesc>)op, createLocalWork);
-        }
       }
       currTopOp = null;
       opProcCtx.setCurrTopOp(currTopOp);
-    } else if (opProcCtx.getCurrMapJoinOp() != null) {
-      AbstractMapJoinOperator<? extends MapJoinDesc> mjOp = opProcCtx.getCurrMapJoinOp();
-      if (currUnionOp != null) {
-        initUnionPlan(opProcCtx, currUnionOp, currTask, false);
-      } else {
-        GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(mjOp);
-
-        // In case of map-join followed by map-join, the file needs to be
-        // obtained from the old map join
-        AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin = mjCtx.getOldMapJoin();
-        String taskTmpDir = null;
-        TableDesc tt_desc = null;
-        Operator<? extends OperatorDesc> rootOp = null;
-
-        boolean local = ((pos == -1) || (pos == (mjOp.getConf())
-            .getPosBigTable())) ? false : true;
-        if (oldMapJoin == null) {
-          if (opProcCtx.getParseCtx().getListMapJoinOpsNoReducer().contains(mjOp)
-              || local || (oldTask != null) && (parTasks != null)) {
-            taskTmpDir = mjCtx.getTaskTmpDir();
-            tt_desc = mjCtx.getTTDesc();
-            rootOp = mjCtx.getRootMapJoinOp();
-          }
-        } else {
-          GenMRMapJoinCtx oldMjCtx = opProcCtx.getMapJoinCtx(oldMapJoin);
-          assert oldMjCtx != null;
-          taskTmpDir = oldMjCtx.getTaskTmpDir();
-          tt_desc = oldMjCtx.getTTDesc();
-          rootOp = oldMjCtx.getRootMapJoinOp();
-        }
-
-        setTaskPlan(taskTmpDir, taskTmpDir, rootOp, plan, local, tt_desc);
-        setupBucketMapJoinInfo(plan, oldMapJoin, createLocalWork);
-      }
-      opProcCtx.setCurrMapJoinOp(null);
     }
 
     if ((oldTask != null) && (parTasks != null)) {
       for (Task<? extends Serializable> parTask : parTasks) {
         parTask.addDependentTask(currTask);
-        if(opProcCtx.getRootTasks().contains(currTask)) {
+        if (opProcCtx.getRootTasks().contains(currTask)) {
           opProcCtx.getRootTasks().remove(currTask);
         }
       }
@@ -557,7 +346,7 @@ public final class GenMapRedUtils {
    *          processing context
    */
   public static void splitPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx)
-  throws SemanticException {
+      throws SemanticException {
     // Generate a new task
     ParseContext parseCtx = opProcCtx.getParseCtx();
     MapredWork cplan = getMapRedWork(parseCtx);
@@ -572,7 +361,7 @@ public final class GenMapRedUtils {
     cplan.setNumReduceTasks(new Integer(desc.getNumReducers()));
 
     HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
-      opProcCtx.getOpTaskMap();
+        opProcCtx.getOpTaskMap();
     opTaskMap.put(reducer, redTask);
     Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
 
@@ -622,7 +411,6 @@ public final class GenMapRedUtils {
     return currentInput;
   }
 
-
   /**
    * set the current task in the mapredWork.
    *
@@ -657,12 +445,12 @@ public final class GenMapRedUtils {
 
     if (partsList == null) {
       try {
-        partsList = parseCtx.getOpToPartList().get((TableScanOperator)topOp);
+        partsList = parseCtx.getOpToPartList().get((TableScanOperator) topOp);
         if (partsList == null) {
           partsList = PartitionPruner.prune(parseCtx.getTopToTable().get(topOp),
-            parseCtx.getOpToPartPruner().get(topOp), opProcCtx.getConf(),
-            alias_id, parseCtx.getPrunedPartitions());
-          parseCtx.getOpToPartList().put((TableScanOperator)topOp, partsList);
+              parseCtx.getOpToPartPruner().get(topOp), opProcCtx.getConf(),
+              alias_id, parseCtx.getPrunedPartitions());
+          parseCtx.getOpToPartList().put((TableScanOperator) topOp, partsList);
         }
       } catch (SemanticException e) {
         throw e;
@@ -701,7 +489,8 @@ public final class GenMapRedUtils {
     long sizeNeeded = Integer.MAX_VALUE;
     int fileLimit = -1;
     if (parseCtx.getGlobalLimitCtx().isEnable()) {
-      long sizePerRow = HiveConf.getLongVar(parseCtx.getConf(), HiveConf.ConfVars.HIVELIMITMAXROWSIZE);
+      long sizePerRow = HiveConf.getLongVar(parseCtx.getConf(),
+          HiveConf.ConfVars.HIVELIMITMAXROWSIZE);
       sizeNeeded = parseCtx.getGlobalLimitCtx().getGlobalLimit() * sizePerRow;
       // for the optimization that reduce number of input file, we limit number
       // of files allowed. If more than specific number of files have to be
@@ -709,7 +498,7 @@ public final class GenMapRedUtils {
       // inputs can cause unpredictable latency. It's not necessarily to be
       // cheaper.
       fileLimit =
-        HiveConf.getIntVar(parseCtx.getConf(), HiveConf.ConfVars.HIVELIMITOPTLIMITFILE);
+          HiveConf.getIntVar(parseCtx.getConf(), HiveConf.ConfVars.HIVELIMITOPTLIMITFILE);
 
       if (sizePerRow <= 0 || fileLimit <= 0) {
         LOG.info("Skip optimization to reduce input size of 'limit'");
@@ -735,6 +524,7 @@ public final class GenMapRedUtils {
     // partitioned table and whether any partition is selected or not
     PlanUtils.addInput(inputs,
         new ReadEntity(parseCtx.getTopToTable().get(topOp), parentViewInfo));
+
     for (Partition part : parts) {
       if (part.getTable().isPartitioned()) {
         PlanUtils.addInput(inputs, new ReadEntity(part, parentViewInfo));
@@ -907,7 +697,7 @@ public final class GenMapRedUtils {
       Operator<? extends OperatorDesc> topOp, MapredWork plan, boolean local,
       TableDesc tt_desc) throws SemanticException {
 
-    if(path == null || alias == null) {
+    if (path == null || alias == null) {
       return;
     }
 
@@ -989,8 +779,8 @@ public final class GenMapRedUtils {
     MapredWork work = new MapredWork();
 
     boolean mapperCannotSpanPartns =
-      conf.getBoolVar(
-        HiveConf.ConfVars.HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS);
+        conf.getBoolVar(
+            HiveConf.ConfVars.HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS);
     work.setMapperCannotSpanPartns(mapperCannotSpanPartns);
     work.setPathToAliases(new LinkedHashMap<String, ArrayList<String>>());
     work.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
@@ -1071,7 +861,7 @@ public final class GenMapRedUtils {
 
     // replace the reduce child with this operator
     List<Operator<? extends OperatorDesc>> childOpList = parent
-    .getChildOperators();
+        .getChildOperators();
     for (int pos = 0; pos < childOpList.size(); pos++) {
       if (childOpList.get(pos) == op) {
         childOpList.set(pos, fs_op);
@@ -1080,7 +870,7 @@ public final class GenMapRedUtils {
     }
 
     List<Operator<? extends OperatorDesc>> parentOpList =
-      new ArrayList<Operator<? extends OperatorDesc>>();
+        new ArrayList<Operator<? extends OperatorDesc>>();
     parentOpList.add(parent);
     fs_op.setParentOperators(parentOpList);
 
@@ -1096,7 +886,7 @@ public final class GenMapRedUtils {
     op.getParentOperators().set(posn, ts_op);
 
     Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx =
-      opProcCtx.getMapCurrCtx();
+        opProcCtx.getMapCurrCtx();
     mapCurrCtx.put(ts_op, new GenMapRedCtx(childTask, null, null));
 
     String streamDesc = taskTmpDir;
@@ -1124,101 +914,12 @@ public final class GenMapRedUtils {
 
     // Add the path to alias mapping
     setTaskPlan(taskTmpDir, streamDesc, ts_op, cplan, local, tt_desc);
-
-    // This can be cleaned up as a function table in future
-    if (op instanceof AbstractMapJoinOperator<?>) {
-      AbstractMapJoinOperator<? extends MapJoinDesc> mjOp = (AbstractMapJoinOperator<? extends MapJoinDesc>) op;
-      opProcCtx.setCurrMapJoinOp(mjOp);
-      GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(mjOp);
-      if (mjCtx == null) {
-        mjCtx = new GenMRMapJoinCtx(taskTmpDir, tt_desc, ts_op, null);
-      } else {
-        mjCtx.setTaskTmpDir(taskTmpDir);
-        mjCtx.setTTDesc(tt_desc);
-        mjCtx.setRootMapJoinOp(ts_op);
-      }
-      opProcCtx.setMapJoinCtx(mjOp, mjCtx);
-      opProcCtx.getMapCurrCtx().put(parent,
-          new GenMapRedCtx(childTask, null, null));
-      setupBucketMapJoinInfo(cplan, mjOp, false);
-    }
-
-    currTopOp = null;
-    String currAliasId = null;
-
-    opProcCtx.setCurrTopOp(currTopOp);
-    opProcCtx.setCurrAliasId(currAliasId);
+    opProcCtx.setCurrTopOp(null);
+    opProcCtx.setCurrAliasId(null);
     opProcCtx.setCurrTask(childTask);
   }
 
-  public static void mergeMapJoinUnion(UnionOperator union,
-      GenMRProcContext ctx, int pos) throws SemanticException {
-    ParseContext parseCtx = ctx.getParseCtx();
-    UnionProcContext uCtx = parseCtx.getUCtx();
-
-    UnionParseContext uPrsCtx = uCtx.getUnionParseContext(union);
-    assert uPrsCtx != null;
-
-    Task<? extends Serializable> currTask = ctx.getCurrTask();
-
-    GenMRUnionCtx uCtxTask = ctx.getUnionTask(union);
-    Task<? extends Serializable> uTask = null;
-
-    union.getParentOperators().get(pos);
-    MapredWork uPlan = null;
-
-    // union is encountered for the first time
-    if (uCtxTask == null) {
-      uCtxTask = new GenMRUnionCtx();
-      uPlan = GenMapRedUtils.getMapRedWork(parseCtx);
-      uTask = TaskFactory.get(uPlan, parseCtx.getConf());
-      uCtxTask.setUTask(uTask);
-      ctx.setUnionTask(union, uCtxTask);
-    } else {
-      uTask = uCtxTask.getUTask();
-      uPlan = (MapredWork) uTask.getWork();
-    }
-
-    // If there is a mapjoin at position 'pos'
-    if (uPrsCtx.getMapJoinSubq(pos)) {
-      GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(ctx.getCurrMapJoinOp());
-      String taskTmpDir = mjCtx.getTaskTmpDir();
-      if (uPlan.getPathToAliases().get(taskTmpDir) == null) {
-        uPlan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
-        uPlan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
-        uPlan.getPathToPartitionInfo().put(taskTmpDir,
-            new PartitionDesc(mjCtx.getTTDesc(), null));
-        uPlan.getAliasToWork().put(taskTmpDir, mjCtx.getRootMapJoinOp());
-      }
-
-      for (Task t : currTask.getParentTasks()) {
-        t.addDependentTask(uTask);
-      }
-      try {
-        boolean notDone = true;
-        while (notDone) {
-          for (Task t : currTask.getParentTasks()) {
-            t.removeDependentTask(currTask);
-          }
-          notDone = false;
-        }
-      } catch (ConcurrentModificationException e) {
-      }
-    } else {
-      setTaskPlan(ctx.getCurrAliasId(), ctx.getCurrTopOp(), uPlan, false, ctx);
-    }
-
-    ctx.setCurrTask(uTask);
-    ctx.setCurrAliasId(null);
-    ctx.setCurrTopOp(null);
-    ctx.setCurrMapJoinOp(null);
-
-    ctx.getMapCurrCtx().put(union,
-        new GenMapRedCtx(ctx.getCurrTask(), null, null));
-  }
-
   private GenMapRedUtils() {
     // prevent instantiation
   }
-
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=1439946&r1=1439945&r2=1439946&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Tue Jan 29 15:33:53 2013
@@ -18,49 +18,38 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
 
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.lib.Utils;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
-import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
 
 /**
  * Operator factory for MapJoin processing.
  */
 public final class MapJoinFactory {
 
-  public static int getPositionParent(AbstractMapJoinOperator<? extends MapJoinDesc> op, Stack<Node> stack) {
+  public static int getPositionParent(AbstractMapJoinOperator<? extends MapJoinDesc> op,
+      Stack<Node> stack) {
     int pos = 0;
     int size = stack.size();
     assert size >= 2 && stack.get(size - 1) == op;
     Operator<? extends OperatorDesc> parent =
-      (Operator<? extends OperatorDesc>) stack.get(size - 2);
+        (Operator<? extends OperatorDesc>) stack.get(size - 2);
     List<Operator<? extends OperatorDesc>> parOp = op.getParentOperators();
     pos = parOp.indexOf(parent);
     assert pos < parOp.size();
@@ -68,221 +57,177 @@ public final class MapJoinFactory {
   }
 
   /**
-   * TableScan followed by MapJoin.
+   * MapJoin processor.
+   * The user can specify a mapjoin hint to specify that the input should be processed as a
+   * mapjoin instead of map-reduce join. If hive.auto.convert.join is set to true, the
+   * user need not specify the hint explicitly, but hive will automatically process the joins
+   * as a mapjoin whenever possible. However, a join can only be processed as a bucketized
+   * map-side join or a sort merge join, if the user has provided the hint explicitly. This
+   * will be fixed as part of HIVE-3433, and eventually, we should remove support for mapjoin
+   * hint.
+   * However, currently, the mapjoin hint is processed as follows:
+   * A mapjoin will have 'n' parents for a n-way mapjoin, and therefore the mapjoin operator
+   * will be encountered 'n' times (one for each parent). Since a reduceSink operator is not
+   * allowed before a mapjoin, the task for the mapjoin will always be a root task. The task
+   * corresponding to the mapjoin is converted to a root task when the operator is encountered
+   * for the first time. When the operator is encountered subsequently, the current task is
+   * merged with the root task for the mapjoin. Note that, it is possible that the map-join task
+   * may be performed as a bucketized map-side join (or sort-merge join), the map join operator
+   * is enhanced to contain the bucketing info. when it is encountered.
    */
-  public static class TableScanMapJoin implements NodeProcessor {
+  private static class TableScanMapJoinProcessor implements NodeProcessor {
 
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-      Object... nodeOutputs) throws SemanticException {
-      AbstractMapJoinOperator<MapJoinDesc> mapJoin = (AbstractMapJoinOperator<MapJoinDesc>) nd;
-      GenMRProcContext ctx = (GenMRProcContext) procCtx;
-
-      // find the branch on which this processor was invoked
-      int pos = getPositionParent(mapJoin, stack);
-
-      Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
-          .getMapCurrCtx();
-      GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
-          pos));
-      Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
-      MapredWork currPlan = (MapredWork) currTask.getWork();
-      Operator<? extends OperatorDesc> currTopOp = mapredCtx.getCurrTopOp();
-      String currAliasId = mapredCtx.getCurrAliasId();
-      Operator<? extends OperatorDesc> reducer = mapJoin;
-      HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
-        ctx.getOpTaskMap();
-      Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
-
-      ctx.setCurrTopOp(currTopOp);
-      ctx.setCurrAliasId(currAliasId);
-      ctx.setCurrTask(currTask);
-
-      // If the plan for this reducer does not exist, initialize the plan
-      if (opMapTask == null) {
-        assert currPlan.getReducer() == null;
-        GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, false, null, false, pos);
-      } else {
-        // The current plan can be thrown away after being merged with the
-        // original plan
-        GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false,
-            false, null);
-        currTask = opMapTask;
-        ctx.setCurrTask(currTask);
+    public static void setupBucketMapJoinInfo(MapredWork plan,
+        AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp) {
+      if (currMapJoinOp != null) {
+        Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
+            currMapJoinOp.getConf().getAliasBucketFileNameMapping();
+        if (aliasBucketFileNameMapping != null) {
+          MapredLocalWork localPlan = plan.getMapLocalWork();
+          if (localPlan == null) {
+            if (currMapJoinOp instanceof SMBMapJoinOperator) {
+              localPlan = ((SMBMapJoinOperator) currMapJoinOp).getConf().getLocalWork();
+            }
+          } else {
+            // local plan is not null, we want to merge it into SMBMapJoinOperator's local work
+            if (currMapJoinOp instanceof SMBMapJoinOperator) {
+              MapredLocalWork smbLocalWork = ((SMBMapJoinOperator) currMapJoinOp).getConf()
+                  .getLocalWork();
+              if (smbLocalWork != null) {
+                localPlan.getAliasToFetchWork().putAll(smbLocalWork.getAliasToFetchWork());
+                localPlan.getAliasToWork().putAll(smbLocalWork.getAliasToWork());
+              }
+            }
+          }
+
+          if (localPlan == null) {
+            return;
+          }
+
+          if (currMapJoinOp instanceof SMBMapJoinOperator) {
+            plan.setMapLocalWork(null);
+            ((SMBMapJoinOperator) currMapJoinOp).getConf().setLocalWork(localPlan);
+          } else {
+            plan.setMapLocalWork(localPlan);
+          }
+          BucketMapJoinContext bucketMJCxt = new BucketMapJoinContext();
+          localPlan.setBucketMapjoinContext(bucketMJCxt);
+          bucketMJCxt.setAliasBucketFileNameMapping(aliasBucketFileNameMapping);
+          bucketMJCxt.setBucketFileNameMapping(
+              currMapJoinOp.getConf().getBigTableBucketNumMapping());
+          localPlan.setInputFileChangeSensitive(true);
+          bucketMJCxt.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias());
+          bucketMJCxt
+              .setBucketMatcherClass(org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class);
+          bucketMJCxt.setBigTablePartSpecToFileMapping(
+              currMapJoinOp.getConf().getBigTablePartSpecToFileMapping());
+          // BucketizedHiveInputFormat should be used for either sort merge join or bucket map join
+          if ((currMapJoinOp instanceof SMBMapJoinOperator)
+              || (currMapJoinOp.getConf().isBucketMapJoin())) {
+            plan.setUseBucketizedHiveInputFormat(true);
+          }
+        }
       }
-
-      mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx
-          .getCurrTopOp(), ctx.getCurrAliasId()));
-      return null;
     }
-  }
 
-  /**
-   * ReduceSink followed by MapJoin.
-   */
-  public static class ReduceSinkMapJoin implements NodeProcessor {
-
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-      AbstractMapJoinOperator<MapJoinDesc> mapJoin = (AbstractMapJoinOperator<MapJoinDesc>) nd;
-      GenMRProcContext opProcCtx = (GenMRProcContext) procCtx;
+    /**
+     * Initialize the current plan by adding it to root tasks. Since a reduce sink
+     * cannot be present before a mapjoin, and the mapjoin operator is encountered
+     * for the first time, the task corresposding to the mapjoin is added to the
+     * root tasks.
+     *
+     * @param op
+     *          the map join operator encountered
+     * @param opProcCtx
+     *          processing context
+     * @param pos
+     *          position of the parent
+     */
+    private static void initMapJoinPlan(AbstractMapJoinOperator<? extends MapJoinDesc> op,
+        GenMRProcContext opProcCtx, int pos)
+        throws SemanticException {
+      Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx =
+          opProcCtx.getMapCurrCtx();
+      int parentPos = (pos == -1) ? 0 : pos;
+      GenMapRedCtx mapredCtx = mapCurrCtx.get(op.getParentOperators().get(
+          parentPos));
+      Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
+      MapredWork plan = (MapredWork) currTask.getWork();
+      HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
+          opProcCtx.getOpTaskMap();
+      Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
 
-      ParseContext parseCtx = opProcCtx.getParseCtx();
-      MapredWork cplan = GenMapRedUtils.getMapRedWork(parseCtx);
-      Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx
-          .getConf());
-      Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
+      MapJoinDesc desc = (MapJoinDesc) op.getConf();
 
-      // find the branch on which this processor was invoked
-      int pos = getPositionParent(mapJoin, stack);
-      boolean local = (pos == ((mapJoin.getConf())).getPosBigTable()) ? false
-          : true;
+      // The map is overloaded to keep track of mapjoins also
+      opTaskMap.put(op, currTask);
 
-      GenMapRedUtils.splitTasks(mapJoin, currTask, redTask, opProcCtx, false,
-        local, pos);
+      List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
+      assert (!rootTasks.contains(currTask));
+      rootTasks.add(currTask);
 
-      currTask = opProcCtx.getCurrTask();
-      HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
-        opProcCtx.getOpTaskMap();
-      Task<? extends Serializable> opMapTask = opTaskMap.get(mapJoin);
+      assert currTopOp != null;
+      opProcCtx.getSeenOps().add(currTopOp);
 
-      // If the plan for this reducer does not exist, initialize the plan
-      if (opMapTask == null) {
-        assert cplan.getReducer() == null;
-        opTaskMap.put(mapJoin, currTask);
-        opProcCtx.setCurrMapJoinOp(null);
-      } else {
-        // The current plan can be thrown away after being merged with the
-        // original plan
-        GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, opProcCtx, pos,
-            false, false, null);
-        currTask = opMapTask;
-        opProcCtx.setCurrTask(currTask);
-      }
-
-      return null;
+      String currAliasId = opProcCtx.getCurrAliasId();
+      boolean local = (pos == desc.getPosBigTable()) ? false : true;
+      GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
+      setupBucketMapJoinInfo(plan, op);
     }
-  }
-
-  /**
-   * MapJoin followed by Select.
-   */
-  public static class MapJoin implements NodeProcessor {
 
     /**
-     * Create a task by splitting the plan below the join. The reason, we have
-     * to do so in the processing of Select and not MapJoin is due to the
-     * walker. While processing a node, it is not safe to alter its children
-     * because that will decide the course of the walk. It is perfectly fine to
-     * muck around with its parents though, since those nodes have already been
-     * visited.
+     * Merge the current task with the task for the current mapjoin. The mapjoin operator
+     * has already been encountered.
+     *
+     * @param op
+     *          operator being processed
+     * @param oldTask
+     *          the old task for the current mapjoin
+     * @param opProcCtx
+     *          processing context
+     * @param pos
+     *          position of the parent in the stack
      */
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-
-      SelectOperator sel = (SelectOperator) nd;
-      AbstractMapJoinOperator<MapJoinDesc> mapJoin = (AbstractMapJoinOperator<MapJoinDesc>) sel.getParentOperators().get(
-          0);
-      assert sel.getParentOperators().size() == 1;
-
-      GenMRProcContext ctx = (GenMRProcContext) procCtx;
-      ParseContext parseCtx = ctx.getParseCtx();
-
-      // is the mapjoin followed by a reducer
-      List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinOps = parseCtx
-          .getListMapJoinOpsNoReducer();
-
-      if (listMapJoinOps.contains(mapJoin)) {
-        ctx.setCurrAliasId(null);
-        ctx.setCurrTopOp(null);
-        Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
-            .getMapCurrCtx();
-        mapCurrCtx.put((Operator<? extends OperatorDesc>) nd, new GenMapRedCtx(
-            ctx.getCurrTask(), null, null));
-        return null;
-      }
-
-      ctx.setCurrMapJoinOp(mapJoin);
-
-      Task<? extends Serializable> currTask = ctx.getCurrTask();
-      GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mapJoin);
-      if (mjCtx == null) {
-        mjCtx = new GenMRMapJoinCtx();
-        ctx.setMapJoinCtx(mapJoin, mjCtx);
+    public static void joinMapJoinPlan(AbstractMapJoinOperator<? extends OperatorDesc> op,
+        Task<? extends Serializable> oldTask,
+        GenMRProcContext opProcCtx, int pos)
+        throws SemanticException {
+      MapredWork plan = (MapredWork) oldTask.getWork();
+      Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
+
+      List<Operator<? extends OperatorDesc>> seenOps = opProcCtx.getSeenOps();
+      String currAliasId = opProcCtx.getCurrAliasId();
+
+      if (!seenOps.contains(currTopOp)) {
+        seenOps.add(currTopOp);
+        boolean local = false;
+        if (pos != -1) {
+          local = (pos == ((MapJoinDesc) op.getConf()).getPosBigTable()) ? false
+              : true;
+        }
+        GenMapRedUtils.setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
+        setupBucketMapJoinInfo(plan, op);
       }
-
-      MapredWork mjPlan = GenMapRedUtils.getMapRedWork(parseCtx);
-      Task<? extends Serializable> mjTask = TaskFactory.get(mjPlan, parseCtx
-          .getConf());
-
-      TableDesc tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
-          .getFieldSchemasFromRowSchema(mapJoin.getSchema(), "temporarycol"));
-
-      // generate the temporary file
-      Context baseCtx = parseCtx.getContext();
-      String taskTmpDir = baseCtx.getMRTmpFileURI();
-
-      // Add the path to alias mapping
-      mjCtx.setTaskTmpDir(taskTmpDir);
-      mjCtx.setTTDesc(tt_desc);
-      mjCtx.setRootMapJoinOp(sel);
-
-      sel.setParentOperators(null);
-
-      // Create a file sink operator for this file name
-      Operator<? extends OperatorDesc> fs_op = OperatorFactory.get(
-          new FileSinkDesc(taskTmpDir, tt_desc, parseCtx.getConf().getBoolVar(
-          HiveConf.ConfVars.COMPRESSINTERMEDIATE)), mapJoin.getSchema());
-
-      assert mapJoin.getChildOperators().size() == 1;
-      mapJoin.getChildOperators().set(0, fs_op);
-
-      List<Operator<? extends OperatorDesc>> parentOpList =
-        new ArrayList<Operator<? extends OperatorDesc>>();
-      parentOpList.add(mapJoin);
-      fs_op.setParentOperators(parentOpList);
-
-      currTask.addDependentTask(mjTask);
-
-      ctx.setCurrTask(mjTask);
-      ctx.setCurrAliasId(null);
-      ctx.setCurrTopOp(null);
-
-      Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
-          .getMapCurrCtx();
-      mapCurrCtx.put((Operator<? extends OperatorDesc>) nd, new GenMapRedCtx(
-          ctx.getCurrTask(), null, null));
-
-      return null;
+      currTopOp = null;
+      opProcCtx.setCurrTopOp(currTopOp);
+      opProcCtx.setCurrTask(oldTask);
     }
-  }
-
-  /**
-   * MapJoin followed by MapJoin.
-   */
-  public static class MapJoinMapJoin implements NodeProcessor {
 
+    /*
+     * The mapjoin operator will be encountered many times (n times for a n-way join). Since a
+     * reduceSink operator is not allowed before a mapjoin, the task for the mapjoin will always
+     * be a root task. The task corresponding to the mapjoin is converted to a root task when the
+     * operator is encountered for the first time. When the operator is encountered subsequently,
+     * the current task is merged with the root task for the mapjoin. Note that, it is possible
+     * that the map-join task may be performed as a bucketized map-side join (or sort-merge join),
+     * the map join operator is enhanced to contain the bucketing info. when it is encountered.
+     */
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-      Object... nodeOutputs) throws SemanticException {
-      AbstractMapJoinOperator<? extends MapJoinDesc> mapJoin =
-        (AbstractMapJoinOperator<? extends MapJoinDesc>) nd;
+        Object... nodeOutputs) throws SemanticException {
+      AbstractMapJoinOperator<MapJoinDesc> mapJoin = (AbstractMapJoinOperator<MapJoinDesc>) nd;
       GenMRProcContext ctx = (GenMRProcContext) procCtx;
 
-      ctx.getParseCtx();
-      AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin = ctx.getCurrMapJoinOp();
-
-      GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mapJoin);
-      if (mjCtx != null) {
-        mjCtx.setOldMapJoin(oldMapJoin);
-      } else {
-        ctx.setMapJoinCtx(mapJoin, new GenMRMapJoinCtx(null, null, null,
-            oldMapJoin));
-      }
-      ctx.setCurrMapJoinOp(mapJoin);
-
       // find the branch on which this processor was invoked
       int pos = getPositionParent(mapJoin, stack);
 
@@ -292,93 +237,27 @@ public final class MapJoinFactory {
           pos));
       Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
       MapredWork currPlan = (MapredWork) currTask.getWork();
-      mapredCtx.getCurrAliasId();
-      Operator<? extends OperatorDesc> reducer = mapJoin;
+      Operator<? extends OperatorDesc> currTopOp = mapredCtx.getCurrTopOp();
+      String currAliasId = mapredCtx.getCurrAliasId();
       HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
-        ctx.getOpTaskMap();
-      Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
+          ctx.getOpTaskMap();
+      Task<? extends Serializable> opMapTask = opTaskMap.get(mapJoin);
 
+      ctx.setCurrTopOp(currTopOp);
+      ctx.setCurrAliasId(currAliasId);
       ctx.setCurrTask(currTask);
 
-      // If the plan for this reducer does not exist, initialize the plan
+      // If we are seeing this mapjoin for the first time, initialize the plan.
+      // If we are seeing this mapjoin for the second or later time then atleast one of the
+      // branches for this mapjoin have been encounered. Join the plan with the plan created
+      // the first time.
       if (opMapTask == null) {
         assert currPlan.getReducer() == null;
-        GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, true, null, false, pos);
+        initMapJoinPlan(mapJoin, ctx, pos);
       } else {
         // The current plan can be thrown away after being merged with the
         // original plan
-        GenMapRedUtils.joinPlan(mapJoin, currTask, opMapTask, ctx, pos, false,
-            true, null);
-        currTask = opMapTask;
-        ctx.setCurrTask(currTask);
-      }
-
-      mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), null, null));
-      return null;
-    }
-  }
-
-  /**
-   * Union followed by MapJoin.
-   */
-  public static class UnionMapJoin implements NodeProcessor {
-
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-      Object... nodeOutputs) throws SemanticException {
-      GenMRProcContext ctx = (GenMRProcContext) procCtx;
-
-      ParseContext parseCtx = ctx.getParseCtx();
-      UnionProcContext uCtx = parseCtx.getUCtx();
-
-      // union was map only - no special processing needed
-      if (uCtx.isMapOnlySubq()) {
-        return (new TableScanMapJoin())
-            .process(nd, stack, procCtx, nodeOutputs);
-      }
-
-      UnionOperator currUnion = Utils.findNode(stack, UnionOperator.class);
-      assert currUnion != null;
-      ctx.getUnionTask(currUnion);
-      AbstractMapJoinOperator<MapJoinDesc> mapJoin = (AbstractMapJoinOperator<MapJoinDesc>) nd;
-
-      // find the branch on which this processor was invoked
-      int pos = getPositionParent(mapJoin, stack);
-
-      Map<Operator<? extends OperatorDesc>, GenMapRedCtx> mapCurrCtx = ctx
-          .getMapCurrCtx();
-      GenMapRedCtx mapredCtx = mapCurrCtx.get(mapJoin.getParentOperators().get(
-          pos));
-      Task<? extends Serializable> currTask = mapredCtx.getCurrTask();
-      MapredWork currPlan = (MapredWork) currTask.getWork();
-      Operator<? extends OperatorDesc> reducer = mapJoin;
-      HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
-        ctx.getOpTaskMap();
-      Task<? extends Serializable> opMapTask = opTaskMap.get(reducer);
-
-      // union result cannot be a map table
-      boolean local = (pos != mapJoin.getConf().getPosBigTable());
-      if (local) {
-        throw new SemanticException(ErrorMsg.INVALID_MAPJOIN_TABLE.getMsg());
-      }
-
-      // If the plan for this reducer does not exist, initialize the plan
-      if (opMapTask == null) {
-        assert currPlan.getReducer() == null;
-        ctx.setCurrMapJoinOp(mapJoin);
-        GenMapRedUtils.initMapJoinPlan(mapJoin, ctx, true, currUnion, false, pos);
-        ctx.setCurrUnionOp(null);
-      } else {
-        // The current plan can be thrown away after being merged with the
-        // original plan
-        Task<? extends Serializable> uTask = ctx.getUnionTask(currUnion).getUTask();
-        if (uTask.getId().equals(opMapTask.getId())) {
-          GenMapRedUtils.joinPlan(mapJoin, null, opMapTask, ctx, pos, false,
-              false, currUnion);
-        } else {
-          GenMapRedUtils.joinPlan(mapJoin, uTask, opMapTask, ctx, pos, false,
-              false, currUnion);
-        }
+        joinMapJoinPlan(mapJoin, opMapTask, ctx, pos);
         currTask = opMapTask;
         ctx.setCurrTask(currTask);
       }
@@ -390,23 +269,7 @@ public final class MapJoinFactory {
   }
 
   public static NodeProcessor getTableScanMapJoin() {
-    return new TableScanMapJoin();
-  }
-
-  public static NodeProcessor getUnionMapJoin() {
-    return new UnionMapJoin();
-  }
-
-  public static NodeProcessor getReduceSinkMapJoin() {
-    return new ReduceSinkMapJoin();
-  }
-
-  public static NodeProcessor getMapJoin() {
-    return new MapJoin();
-  }
-
-  public static NodeProcessor getMapJoinMapJoin() {
-    return new MapJoinMapJoin();
+    return new TableScanMapJoinProcessor();
   }
 
   private MapJoinFactory() {



Mime
View raw message