hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r1036128 [3/19] - in /hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/o...
Date Wed, 17 Nov 2010 17:33:11 GMT
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Wed Nov 17 17:33:06 2010
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.optimi
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hive.ql.exec.Re
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.ScriptOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -53,7 +55,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
-import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.ErrorMsg;
 import org.apache.hadoop.hive.ql.parse.GenMapRedWalker;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
@@ -61,23 +62,28 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.TypeCheckCtx;
-import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 /**
- * Implementation of one of the rule-based map join optimization. User passes
- * hints to specify map-joins and during this optimization, all user specified
- * map joins are converted to MapJoins - the reduce sink operator above the join
- * are converted to map sink operators. In future, once statistics are
- * implemented, this transformation can also be done based on costs.
+ * Implementation of one of the rule-based map join optimization. User passes hints to specify
+ * map-joins and during this optimization, all user specified map joins are converted to MapJoins -
+ * the reduce sink operator above the join are converted to map sink operators. In future, once
+ * statistics are implemented, this transformation can also be done based on costs.
  */
 public class MapJoinProcessor implements Transform {
 
@@ -93,14 +99,160 @@ public class MapJoinProcessor implements
   }
 
   @SuppressWarnings("nls")
-  private Operator<? extends Serializable> putOpInsertMap(
-      Operator<? extends Serializable> op, RowResolver rr) {
+  private Operator<? extends Serializable> putOpInsertMap(Operator<? extends Serializable> op,
+      RowResolver rr) {
     OpParseContext ctx = new OpParseContext(rr);
     pGraphContext.getOpParseCtx().put(op, ctx);
     return op;
   }
 
   /**
+   * Generate the MapRed Local Work
+   * @param newWork
+   * @param mapJoinOp
+   * @param bigTablePos
+   * @return
+   * @throws SemanticException
+   */
+  private static String genMapJoinLocalWork(MapredWork newWork, MapJoinOperator mapJoinOp,
+      int bigTablePos) throws SemanticException {
+    // keep the small table alias to avoid concurrent modification exception
+    ArrayList<String> smallTableAliasList = new ArrayList<String>();
+    String bigTableAlias = null;
+
+    // create a new  MapredLocalWork
+    MapredLocalWork newLocalWork = new MapredLocalWork(
+        new LinkedHashMap<String, Operator<? extends Serializable>>(),
+        new LinkedHashMap<String, FetchWork>());
+
+    for (Map.Entry<String, Operator<? extends Serializable>> entry : newWork.getAliasToWork()
+        .entrySet()) {
+      String alias = entry.getKey();
+      Operator<? extends Serializable> op = entry.getValue();
+      // get table scan op
+      if (!(op instanceof TableScanOperator)) {
+        throw new SemanticException("top op is not table scan");
+      }
+      TableScanOperator tableScanOp = (TableScanOperator) op;
+
+      // if the table scan is for big table; then skip it
+      // tracing down the operator tree from the table scan operator
+      Operator<? extends Serializable> parentOp = tableScanOp;
+      Operator<? extends Serializable> childOp = tableScanOp.getChildOperators().get(0);
+      while ((childOp != null) && (!childOp.equals(mapJoinOp))) {
+        parentOp = childOp;
+        assert parentOp.getChildOperators().size() == 1;
+        childOp = parentOp.getChildOperators().get(0);
+      }
+      if (childOp == null) {
+        throw new SemanticException(
+            "Cannot find join op by tracing down the table scan operator tree");
+      }
+      // skip the big table pos
+      int i = childOp.getParentOperators().indexOf(parentOp);
+      if (i == bigTablePos) {
+        bigTableAlias = alias;
+        continue;
+      }
+      // set alias to work and put into smallTableAliasList
+      newLocalWork.getAliasToWork().put(alias, tableScanOp);
+      smallTableAliasList.add(alias);
+      // get input path and remove this alias from pathToAlias
+      // because this file will be fetched by fetch operator
+      LinkedHashMap<String, ArrayList<String>> pathToAliases = newWork.getPathToAliases();
+
+      // keep record all the input path for this alias
+      HashSet<String> pathSet = new HashSet<String>();
+      HashSet<String> emptyPath = new HashSet<String>();
+      for (Map.Entry<String, ArrayList<String>> entry2 : pathToAliases.entrySet()) {
+        String path = entry2.getKey();
+        ArrayList<String> list = entry2.getValue();
+        if (list.contains(alias)) {
+          // add to path set
+          if (!pathSet.contains(path)) {
+            pathSet.add(path);
+          }
+          //remove this alias from the alias list
+          list.remove(alias);
+          if(list.size() == 0) {
+            emptyPath.add(path);
+          }
+        }
+      }
+      //remove the path, with which no alias associates
+      for (String path : emptyPath) {
+        pathToAliases.remove(path);
+      }
+
+      if (pathSet.size() == 0) {
+        throw new SemanticException("No input path for alias " + alias);
+      }
+
+      // create fetch work
+      FetchWork fetchWork = null;
+      List<String> partDir = new ArrayList<String>();
+      List<PartitionDesc> partDesc = new ArrayList<PartitionDesc>();
+
+      for (String tablePath : pathSet) {
+        PartitionDesc partitionDesc = newWork.getPathToPartitionInfo().get(tablePath);
+        // create fetchwork for non partitioned table
+        if (partitionDesc.getPartSpec() == null || partitionDesc.getPartSpec().size() == 0) {
+          fetchWork = new FetchWork(tablePath, partitionDesc.getTableDesc());
+          break;
+        }
+        // if table is partitioned,add partDir and partitionDesc
+        partDir.add(tablePath);
+        partDesc.add(partitionDesc);
+      }
+      // create fetchwork for partitioned table
+      if (fetchWork == null) {
+        fetchWork = new FetchWork(partDir, partDesc);
+      }
+      // set alias to fetch work
+      newLocalWork.getAliasToFetchWork().put(alias, fetchWork);
+    }
+    // remove small table ailias from aliasToWork;Avoid concurrent modification
+    for (String alias : smallTableAliasList) {
+      newWork.getAliasToWork().remove(alias);
+    }
+
+    // set up local work
+    newWork.setMapLocalWork(newLocalWork);
+    // remove reducer
+    newWork.setReducer(null);
+    // return the big table alias
+    if (bigTableAlias == null) {
+      throw new SemanticException("Big Table Alias is null");
+    }
+    return bigTableAlias;
+  }
+
+  public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos)
+      throws SemanticException {
+    try {
+      LinkedHashMap<Operator<? extends Serializable>, OpParseContext> opParseCtxMap = newWork
+          .getOpParseCtxMap();
+      QBJoinTree newJoinTree = newWork.getJoinTree();
+      // generate the map join operator; already checked the map join
+      MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op,
+          newJoinTree, mapJoinPos, true);
+      // generate the local work and return the big table alias
+      String bigTableAlias = MapJoinProcessor
+          .genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos);
+      // clean up the mapred work
+      newWork.setOpParseCtxMap(null);
+      newWork.setJoinTree(null);
+
+      return bigTableAlias;
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new SemanticException("Generate New MapJoin Opertor Exeception " + e.getMessage());
+    }
+
+  }
+
+  /**
    * convert a regular join to a a map-side join.
    *
    * @param op
@@ -108,29 +260,27 @@ public class MapJoinProcessor implements
    * @param qbJoin
    *          qb join tree
    * @param mapJoinPos
-   *          position of the source to be read as part of map-reduce framework.
-   *          All other sources are cached in memory
+   *          position of the source to be read as part of map-reduce framework. All other sources
+   *          are cached in memory
    */
-  private MapJoinOperator convertMapJoin(ParseContext pctx, JoinOperator op,
-      QBJoinTree joinTree, int mapJoinPos) throws SemanticException {
+  public static MapJoinOperator convertMapJoin(
+      LinkedHashMap<Operator<? extends Serializable>, OpParseContext> opParseCtxMap,
+      JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
+      throws SemanticException {
     // outer join cannot be performed on a table which is being cached
     JoinDesc desc = op.getConf();
-    org.apache.hadoop.hive.ql.plan.JoinCondDesc[] condns = desc.getConds();
-    HiveConf hiveConf = pGraphContext.getConf();
-    boolean noCheckOuterJoin = HiveConf.getBoolVar(hiveConf,
-        HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)
-        && HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN);
+    JoinCondDesc[] condns = desc.getConds();
+    Byte[] tagOrder = desc.getTagOrder();
+
     if (!noCheckOuterJoin) {
       checkMapJoin(mapJoinPos, condns);
     }
 
-    RowResolver oldOutputRS = pctx.getOpParseCtx().get(op).getRR();
+    RowResolver oldOutputRS = opParseCtxMap.get(op).getRowResolver();
     RowResolver outputRS = new RowResolver();
     ArrayList<String> outputColumnNames = new ArrayList<String>();
     Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
     Map<Byte, List<ExprNodeDesc>> valueExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
-    HashMap<Byte, List<ExprNodeDesc>> filterMap =
-      new HashMap<Byte, List<ExprNodeDesc>>();
 
     // Walk over all the sources (which are guaranteed to be reduce sink
     // operators).
@@ -141,13 +291,14 @@ public class MapJoinProcessor implements
     List<Operator<? extends Serializable>> newParentOps = new ArrayList<Operator<? extends Serializable>>();
     List<Operator<? extends Serializable>> oldReduceSinkParentOps = new ArrayList<Operator<? extends Serializable>>();
     Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+    HashMap<Byte, HashMap<String, ExprNodeDesc>> columnTransfer = new HashMap<Byte, HashMap<String, ExprNodeDesc>>();
+
     // found a source which is not to be stored in memory
     if (leftSrc != null) {
       // assert mapJoinPos == 0;
       Operator<? extends Serializable> parentOp = parentOps.get(0);
       assert parentOp.getParentOperators().size() == 1;
-      Operator<? extends Serializable> grandParentOp = parentOp
-          .getParentOperators().get(0);
+      Operator<? extends Serializable> grandParentOp = parentOp.getParentOperators().get(0);
       oldReduceSinkParentOps.add(parentOp);
       grandParentOp.removeChild(parentOp);
       newParentOps.add(grandParentOp);
@@ -159,8 +310,7 @@ public class MapJoinProcessor implements
       if (src != null) {
         Operator<? extends Serializable> parentOp = parentOps.get(pos);
         assert parentOp.getParentOperators().size() == 1;
-        Operator<? extends Serializable> grandParentOp = parentOp
-            .getParentOperators().get(0);
+        Operator<? extends Serializable> grandParentOp = parentOp.getParentOperators().get(0);
 
         grandParentOp.removeChild(parentOp);
         oldReduceSinkParentOps.add(parentOp);
@@ -171,21 +321,21 @@ public class MapJoinProcessor implements
 
     // get the join keys from old parent ReduceSink operators
     for (pos = 0; pos < newParentOps.size(); pos++) {
-      ReduceSinkOperator oldPar = (ReduceSinkOperator) oldReduceSinkParentOps
-          .get(pos);
+      ReduceSinkOperator oldPar = (ReduceSinkOperator) oldReduceSinkParentOps.get(pos);
       ReduceSinkDesc rsconf = oldPar.getConf();
       Byte tag = (byte) rsconf.getTag();
       List<ExprNodeDesc> keys = rsconf.getKeyCols();
       keyExprMap.put(tag, keys);
+
+      // set column transfer
+      HashMap<String, ExprNodeDesc> map = (HashMap<String, ExprNodeDesc>) oldPar.getColumnExprMap();
+      columnTransfer.put(tag, map);
     }
 
     // create the map-join operator
     for (pos = 0; pos < newParentOps.size(); pos++) {
-      RowResolver inputRS = pGraphContext.getOpParseCtx().get(
-          newParentOps.get(pos)).getRR();
-
+      RowResolver inputRS = opParseCtxMap.get(newParentOps.get(pos)).getRowResolver();
       List<ExprNodeDesc> values = new ArrayList<ExprNodeDesc>();
-      List<ExprNodeDesc> filterDesc = new ArrayList<ExprNodeDesc>();
 
       Iterator<String> keysIter = inputRS.getTableNames().iterator();
       while (keysIter.hasNext()) {
@@ -202,35 +352,48 @@ public class MapJoinProcessor implements
           String outputCol = oldValueInfo.getInternalName();
           if (outputRS.get(key, field) == null) {
             outputColumnNames.add(outputCol);
-            ExprNodeDesc colDesc = new ExprNodeColumnDesc(valueInfo.getType(),
-                valueInfo.getInternalName(), valueInfo.getTabAlias(), valueInfo
-                .getIsVirtualCol());
+            ExprNodeDesc colDesc = new ExprNodeColumnDesc(valueInfo.getType(), valueInfo
+                .getInternalName(), valueInfo.getTabAlias(), valueInfo.getIsVirtualCol());
             values.add(colDesc);
-            outputRS.put(key, field, new ColumnInfo(outputCol, valueInfo
-                .getType(), valueInfo.getTabAlias(), valueInfo
-                .getIsVirtualCol(),valueInfo.isHiddenVirtualCol()));
+            outputRS.put(key, field, new ColumnInfo(outputCol, valueInfo.getType(), valueInfo
+                .getTabAlias(), valueInfo.getIsVirtualCol(), valueInfo.isHiddenVirtualCol()));
             colExprMap.put(outputCol, colDesc);
           }
         }
       }
 
-      TypeCheckCtx tcCtx = new TypeCheckCtx(inputRS);
-      for (ASTNode cond : joinTree.getFilters().get((byte)pos)) {
+      valueExprMap.put(new Byte((byte) pos), values);
+    }
 
-        ExprNodeDesc filter =
-          (ExprNodeDesc)TypeCheckProcFactory.genExprNode(cond, tcCtx).get(cond);
-        if (filter == null) {
-          throw new SemanticException(tcCtx.getError());
+    Map<Byte, List<ExprNodeDesc>> filterMap = desc.getFilters();
+    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filterMap.entrySet()) {
+      Byte srcAlias = entry.getKey();
+      List<ExprNodeDesc> columnDescList = entry.getValue();
+
+      for (ExprNodeDesc nodeExpr : columnDescList) {
+        ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) nodeExpr;
+        for (ExprNodeDesc childDesc : funcDesc.getChildExprs()) {
+          if (!(childDesc instanceof ExprNodeColumnDesc)) {
+            continue;
+          }
+          ExprNodeColumnDesc columnDesc = (ExprNodeColumnDesc) childDesc;
+          // reset columns
+          String column = columnDesc.getColumn();
+          String newColumn = null;
+          HashMap<String, ExprNodeDesc> map = columnTransfer.get(srcAlias);
+          ExprNodeColumnDesc tmpDesc = (ExprNodeColumnDesc) map.get(column);
+          if (tmpDesc != null) {
+            newColumn = tmpDesc.getColumn();
+          }
+          if (newColumn == null) {
+            throw new SemanticException("No Column name found in parent reduce sink op");
+          }
+          columnDesc.setColumn(newColumn);
         }
-        filterDesc.add(filter);
       }
-
-      valueExprMap.put(new Byte((byte) pos), values);
-      filterMap.put(new Byte((byte) pos), filterDesc);
     }
 
-    org.apache.hadoop.hive.ql.plan.JoinCondDesc[] joinCondns = op.getConf()
-        .getConds();
+    JoinCondDesc[] joinCondns = op.getConf().getConds();
 
     Operator[] newPar = new Operator[newParentOps.size()];
     pos = 0;
@@ -248,9 +411,25 @@ public class MapJoinProcessor implements
         .getFieldSchemasFromColumnList(keyCols, "mapjoinkey"));
 
     List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
+    List<TableDesc> valueFiltedTableDescs = new ArrayList<TableDesc>();
 
     for (pos = 0; pos < newParentOps.size(); pos++) {
       List<ExprNodeDesc> valueCols = valueExprMap.get(new Byte((byte) pos));
+      int length = valueCols.size();
+      List<ExprNodeDesc> valueFilteredCols = new ArrayList<ExprNodeDesc>(length);
+      // deep copy expr node desc
+      for (int i = 0; i < length; i++) {
+        valueFilteredCols.add(valueCols.get(i).clone());
+      }
+      List<ExprNodeDesc> valueFilters = filterMap.get(new Byte((byte) pos));
+
+      if (valueFilters != null && valueFilters.size() != 0 && pos != mapJoinPos) {
+        ExprNodeColumnDesc isFilterDesc = new ExprNodeColumnDesc(TypeInfoFactory
+            .getPrimitiveTypeInfo(Constants.BOOLEAN_TYPE_NAME), "filter", "filter", false);
+        valueFilteredCols.add(isFilterDesc);
+      }
+
+
       keyOrder = new StringBuilder();
       for (int i = 0; i < valueCols.size(); i++) {
         keyOrder.append("+");
@@ -258,15 +437,22 @@ public class MapJoinProcessor implements
 
       TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
           .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
+      TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
+          .getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue"));
 
       valueTableDescs.add(valueTableDesc);
+      valueFiltedTableDescs.add(valueFilteredTableDesc);
     }
+    MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, valueExprMap,
+        valueTableDescs, valueFiltedTableDescs, outputColumnNames, mapJoinPos, joinCondns,
+        filterMap, op.getConf().getNoOuterJoin());
+    mapJoinDescriptor.setTagOrder(tagOrder);
+
+    MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild(
+        mapJoinDescriptor, new RowSchema(outputRS.getColumnInfos()), newPar);
 
-    MapJoinOperator mapJoinOp = (MapJoinOperator) putOpInsertMap(
-        OperatorFactory.getAndMakeChild(new MapJoinDesc(keyExprMap,
-        keyTableDesc, valueExprMap, valueTableDescs, outputColumnNames,
-        mapJoinPos, joinCondns, filterMap, op.getConf().getNoOuterJoin()),
-        new RowSchema(outputRS.getColumnInfos()), newPar), outputRS);
+    OpParseContext ctx = new OpParseContext(outputRS);
+    opParseCtxMap.put(mapJoinOp, ctx);
 
     mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs());
     mapJoinOp.setColumnExprMap(colExprMap);
@@ -283,37 +469,59 @@ public class MapJoinProcessor implements
     op.setChildOperators(null);
     op.setParentOperators(null);
 
+    return mapJoinOp;
+  }
+
+  public MapJoinOperator generateMapJoinOperator(ParseContext pctx, JoinOperator op,
+      QBJoinTree joinTree, int mapJoinPos) throws SemanticException {
+    HiveConf hiveConf = pctx.getConf();
+    boolean noCheckOuterJoin = HiveConf.getBoolVar(hiveConf,
+        HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)
+        && HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN);
+
+
+    LinkedHashMap<Operator<? extends Serializable>, OpParseContext> opParseCtxMap = pctx
+        .getOpParseCtx();
+    MapJoinOperator mapJoinOp = convertMapJoin(opParseCtxMap, op, joinTree, mapJoinPos,
+        noCheckOuterJoin);
     // create a dummy select to select all columns
     genSelectPlan(pctx, mapJoinOp);
     return mapJoinOp;
   }
 
-  public static void checkMapJoin(int mapJoinPos,
-      org.apache.hadoop.hive.ql.plan.JoinCondDesc[] condns)
-      throws SemanticException {
-    for (org.apache.hadoop.hive.ql.plan.JoinCondDesc condn : condns) {
-      if (condn.getType() == JoinDesc.FULL_OUTER_JOIN) {
-        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
-      }
-      if ((condn.getType() == JoinDesc.LEFT_OUTER_JOIN)
-          && (condn.getLeft() != mapJoinPos)) {
-        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
-      }
-      if ((condn.getType() == JoinDesc.RIGHT_OUTER_JOIN)
-          && (condn.getRight() != mapJoinPos)) {
-        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+  public static HashSet<Integer> getSmallTableOnlySet(JoinCondDesc[] condns) {
+    HashSet<Integer> smallTableOnlySet = new HashSet<Integer>();
+
+    for (JoinCondDesc condn : condns) {
+      int joinType = condn.getType();
+      if (joinType == JoinDesc.FULL_OUTER_JOIN) {
+        return null;
+      } else if (joinType == JoinDesc.LEFT_OUTER_JOIN || joinType == JoinDesc.LEFT_SEMI_JOIN) {
+        smallTableOnlySet.add(condn.getRight());
+      } else if (joinType == JoinDesc.RIGHT_OUTER_JOIN) {
+        smallTableOnlySet.add(condn.getLeft());
       }
     }
+
+    return smallTableOnlySet;
   }
 
-  private void genSelectPlan(ParseContext pctx, MapJoinOperator input)
-      throws SemanticException {
+  public static void checkMapJoin(int mapJoinPos, JoinCondDesc[] condns) throws SemanticException {
+    HashSet<Integer> smallTableOnlySet = MapJoinProcessor.getSmallTableOnlySet(condns);
+
+    if (smallTableOnlySet == null || smallTableOnlySet.contains(mapJoinPos)) {
+      throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+    }
+    return;
+  }
+
+  private void genSelectPlan(ParseContext pctx, MapJoinOperator input) throws SemanticException {
     List<Operator<? extends Serializable>> childOps = input.getChildOperators();
     input.setChildOperators(null);
 
     // create a dummy select - This select is needed by the walker to split the
     // mapJoin later on
-    RowResolver inputRR = pctx.getOpParseCtx().get(input).getRR();
+    RowResolver inputRR = pctx.getOpParseCtx().get(input).getRowResolver();
 
     ArrayList<ExprNodeDesc> exprs = new ArrayList<ExprNodeDesc>();
     ArrayList<String> outputs = new ArrayList<String>();
@@ -326,20 +534,19 @@ public class MapJoinProcessor implements
       String internalName = outputCols.get(i);
       String[] nm = inputRR.reverseLookup(internalName);
       ColumnInfo valueInfo = inputRR.get(nm[0], nm[1]);
-      ExprNodeDesc colDesc = new ExprNodeColumnDesc(valueInfo.getType(),
-          valueInfo.getInternalName(), nm[0], valueInfo.getIsVirtualCol());
+      ExprNodeDesc colDesc = new ExprNodeColumnDesc(valueInfo.getType(), valueInfo
+          .getInternalName(), nm[0], valueInfo.getIsVirtualCol());
       exprs.add(colDesc);
       outputs.add(internalName);
-      outputRS.put(nm[0], nm[1], new ColumnInfo(internalName, valueInfo
-          .getType(), nm[0], valueInfo.getIsVirtualCol(), valueInfo.isHiddenVirtualCol()));
+      outputRS.put(nm[0], nm[1], new ColumnInfo(internalName, valueInfo.getType(), nm[0], valueInfo
+          .getIsVirtualCol(), valueInfo.isHiddenVirtualCol()));
       colExprMap.put(internalName, colDesc);
     }
 
     SelectDesc select = new SelectDesc(exprs, outputs, false);
 
-    SelectOperator sel = (SelectOperator) putOpInsertMap(
-        OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR
-        .getColumnInfos()), input), inputRR);
+    SelectOperator sel = (SelectOperator) putOpInsertMap(OperatorFactory.getAndMakeChild(select,
+        new RowSchema(inputRR.getColumnInfos()), input), inputRR);
 
     sel.setColumnExprMap(colExprMap);
 
@@ -357,11 +564,10 @@ public class MapJoinProcessor implements
    *          join operator
    * @param qbJoin
    *          qb join tree
-   * @return -1 if it cannot be converted to a map-side join, position of the
-   *         map join node otherwise
+   * @return -1 if it cannot be converted to a map-side join, position of the map join node
+   *         otherwise
    */
-  private int mapSideJoin(JoinOperator op, QBJoinTree joinTree)
-      throws SemanticException {
+  private int mapSideJoin(JoinOperator op, QBJoinTree joinTree) throws SemanticException {
     int mapJoinPos = -1;
     if (joinTree.isMapSideJoin()) {
       int pos = 0;
@@ -387,8 +593,8 @@ public class MapJoinProcessor implements
       // support this by randomly
       // leaving some table from the list of tables to be cached
       if (mapJoinPos == -1) {
-        throw new SemanticException(ErrorMsg.INVALID_MAPJOIN_HINT
-            .getMsg(pGraphContext.getQB().getParseInfo().getHints()));
+        throw new SemanticException(ErrorMsg.INVALID_MAPJOIN_HINT.getMsg(pGraphContext.getQB()
+            .getParseInfo().getHints()));
       }
     }
 
@@ -396,8 +602,8 @@ public class MapJoinProcessor implements
   }
 
   /**
-   * Transform the query tree. For each join, check if it is a map-side join
-   * (user specified). If yes, convert it to a map-side join.
+   * Transform the query tree. For each join, check if it is a map-side join (user specified). If
+   * yes, convert it to a map-side join.
    *
    * @param pactx
    *          current parse context
@@ -410,22 +616,20 @@ public class MapJoinProcessor implements
     if (pGraphContext.getJoinContext() != null) {
       Map<JoinOperator, QBJoinTree> joinMap = new HashMap<JoinOperator, QBJoinTree>();
       Map<MapJoinOperator, QBJoinTree> mapJoinMap = pGraphContext.getMapJoinContext();
-      if(mapJoinMap == null) {
-        mapJoinMap = new HashMap<MapJoinOperator, QBJoinTree> ();
+      if (mapJoinMap == null) {
+        mapJoinMap = new HashMap<MapJoinOperator, QBJoinTree>();
         pGraphContext.setMapJoinContext(mapJoinMap);
       }
 
-      Set<Map.Entry<JoinOperator, QBJoinTree>> joinCtx = pGraphContext
-          .getJoinContext().entrySet();
-      Iterator<Map.Entry<JoinOperator, QBJoinTree>> joinCtxIter = joinCtx
-          .iterator();
+      Set<Map.Entry<JoinOperator, QBJoinTree>> joinCtx = pGraphContext.getJoinContext().entrySet();
+      Iterator<Map.Entry<JoinOperator, QBJoinTree>> joinCtxIter = joinCtx.iterator();
       while (joinCtxIter.hasNext()) {
         Map.Entry<JoinOperator, QBJoinTree> joinEntry = joinCtxIter.next();
         JoinOperator joinOp = joinEntry.getKey();
         QBJoinTree qbJoin = joinEntry.getValue();
         int mapJoinPos = mapSideJoin(joinOp, qbJoin);
         if (mapJoinPos >= 0) {
-          MapJoinOperator mapJoinOp = convertMapJoin(pactx, joinOp, qbJoin, mapJoinPos);
+          MapJoinOperator mapJoinOp = generateMapJoinOperator(pactx, joinOp, qbJoin, mapJoinPos);
           listMapJoinOps.add(mapJoinOp);
           mapJoinMap.put(mapJoinOp, qbJoin);
         } else {
@@ -444,19 +648,15 @@ public class MapJoinProcessor implements
     // the operator stack.
     // The dispatcher generates the plan from the operator tree
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    opRules.put(new RuleRegExp(new String("R0"), "MAPJOIN%"),
-        getCurrentMapJoin());
-    opRules.put(new RuleRegExp(new String("R1"), "MAPJOIN%.*FS%"),
-        getMapJoinFS());
-    opRules.put(new RuleRegExp(new String("R2"), "MAPJOIN%.*RS%"),
-        getMapJoinDefault());
-    opRules.put(new RuleRegExp(new String("R4"), "MAPJOIN%.*UNION%"),
-        getMapJoinDefault());
+    opRules.put(new RuleRegExp(new String("R0"), "MAPJOIN%"), getCurrentMapJoin());
+    opRules.put(new RuleRegExp(new String("R1"), "MAPJOIN%.*FS%"), getMapJoinFS());
+    opRules.put(new RuleRegExp(new String("R2"), "MAPJOIN%.*RS%"), getMapJoinDefault());
+    opRules.put(new RuleRegExp(new String("R4"), "MAPJOIN%.*UNION%"), getMapJoinDefault());
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
-    Dispatcher disp = new DefaultRuleDispatcher(getDefault(), opRules,
-        new MapJoinWalkerCtx(listMapJoinOpsNoRed, pGraphContext));
+    Dispatcher disp = new DefaultRuleDispatcher(getDefault(), opRules, new MapJoinWalkerCtx(
+        listMapJoinOpsNoRed, pGraphContext));
 
     GraphWalker ogw = new GenMapRedWalker(disp);
     ArrayList<Node> topNodes = new ArrayList<Node>();
@@ -483,7 +683,7 @@ public class MapJoinProcessor implements
       MapJoinWalkerCtx ctx = (MapJoinWalkerCtx) procCtx;
       MapJoinOperator mapJoin = (MapJoinOperator) nd;
       if (ctx.getListRejectedMapJoins() != null && !ctx.getListRejectedMapJoins().contains(mapJoin)) {
-        //for rule: MapJoin%.*MapJoin
+        // for rule: MapJoin%.*MapJoin
         // have a child mapjoin. if the the current mapjoin is on a local work,
         // will put the current mapjoin in the rejected list.
         Boolean bigBranch = findGrandChildSubqueryMapjoin(ctx, mapJoin);
@@ -491,7 +691,7 @@ public class MapJoinProcessor implements
           ctx.setCurrMapJoinOp(mapJoin);
           return null;
         }
-        if(bigBranch) {
+        if (bigBranch) {
           addNoReducerMapJoinToCtx(ctx, mapJoin);
         } else {
           addRejectMapJoinToCtx(ctx, mapJoin);
@@ -505,28 +705,24 @@ public class MapJoinProcessor implements
     private Boolean findGrandChildSubqueryMapjoin(MapJoinWalkerCtx ctx, MapJoinOperator mapJoin) {
       Operator<? extends Serializable> parent = mapJoin;
       while (true) {
-        if(parent.getChildOperators() == null || parent.getChildOperators().size() != 1) {
+        if (parent.getChildOperators() == null || parent.getChildOperators().size() != 1) {
           return null;
         }
         Operator<? extends Serializable> ch = parent.getChildOperators().get(0);
-        if(ch instanceof MapJoinOperator) {
-          if (!nonSubqueryMapJoin(ctx.getpGraphContext(), (MapJoinOperator) ch,
-              mapJoin)) {
-            if (ch.getParentOperators().indexOf(parent) == ((MapJoinOperator) ch)
-                .getConf().getPosBigTable()) {
-              //not come from the local branch
+        if (ch instanceof MapJoinOperator) {
+          if (!nonSubqueryMapJoin(ctx.getpGraphContext(), (MapJoinOperator) ch, mapJoin)) {
+            if (ch.getParentOperators().indexOf(parent) == ((MapJoinOperator) ch).getConf()
+                .getPosBigTable()) {
+              // not come from the local branch
               return true;
             }
           }
           return false; // not from a sub-query.
         }
 
-        if ((ch instanceof JoinOperator)
-            || (ch instanceof UnionOperator)
-            || (ch instanceof ReduceSinkOperator)
-            || (ch instanceof LateralViewJoinOperator)
-            || (ch instanceof GroupByOperator)
-            || (ch instanceof ScriptOperator)) {
+        if ((ch instanceof JoinOperator) || (ch instanceof UnionOperator)
+            || (ch instanceof ReduceSinkOperator) || (ch instanceof LateralViewJoinOperator)
+            || (ch instanceof GroupByOperator) || (ch instanceof ScriptOperator)) {
           return null;
         }
 
@@ -534,11 +730,11 @@ public class MapJoinProcessor implements
       }
     }
 
-    private boolean nonSubqueryMapJoin(ParseContext pGraphContext,
-        MapJoinOperator mapJoin, MapJoinOperator parentMapJoin) {
+    private boolean nonSubqueryMapJoin(ParseContext pGraphContext, MapJoinOperator mapJoin,
+        MapJoinOperator parentMapJoin) {
       QBJoinTree joinTree = pGraphContext.getMapJoinContext().get(mapJoin);
       QBJoinTree parentJoinTree = pGraphContext.getMapJoinContext().get(parentMapJoin);
-      if(joinTree.getJoinSrc() != null && joinTree.getJoinSrc().equals(parentJoinTree)) {
+      if (joinTree.getJoinSrc() != null && joinTree.getJoinSrc().equals(parentJoinTree)) {
         return true;
       }
       return false;
@@ -547,11 +743,11 @@ public class MapJoinProcessor implements
 
   private static void addNoReducerMapJoinToCtx(MapJoinWalkerCtx ctx,
       AbstractMapJoinOperator<? extends MapJoinDesc> mapJoin) {
-    if (ctx.getListRejectedMapJoins() != null
-        && ctx.getListRejectedMapJoins().contains(mapJoin)) {
+    if (ctx.getListRejectedMapJoins() != null && ctx.getListRejectedMapJoins().contains(mapJoin)) {
       return;
     }
-    List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinsNoRed = ctx.getListMapJoinsNoRed();
+    List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinsNoRed = ctx
+        .getListMapJoinsNoRed();
     if (listMapJoinsNoRed == null) {
       listMapJoinsNoRed = new ArrayList<AbstractMapJoinOperator<? extends MapJoinDesc>>();
     }
@@ -565,10 +761,11 @@ public class MapJoinProcessor implements
       AbstractMapJoinOperator<? extends MapJoinDesc> mapjoin) {
     // current map join is null means it has been handled by CurrentMapJoin
     // process.
-    if(mapjoin == null) {
+    if (mapjoin == null) {
       return;
     }
-    List<AbstractMapJoinOperator<? extends MapJoinDesc>> listRejectedMapJoins = ctx.getListRejectedMapJoins();
+    List<AbstractMapJoinOperator<? extends MapJoinDesc>> listRejectedMapJoins = ctx
+        .getListRejectedMapJoins();
     if (listRejectedMapJoins == null) {
       listRejectedMapJoins = new ArrayList<AbstractMapJoinOperator<? extends MapJoinDesc>>();
     }
@@ -576,15 +773,15 @@ public class MapJoinProcessor implements
       listRejectedMapJoins.add(mapjoin);
     }
 
-    if (ctx.getListMapJoinsNoRed() != null
-        && ctx.getListMapJoinsNoRed().contains(mapjoin)) {
+    if (ctx.getListMapJoinsNoRed() != null && ctx.getListMapJoinsNoRed().contains(mapjoin)) {
       ctx.getListMapJoinsNoRed().remove(mapjoin);
     }
 
     ctx.setListRejectedMapJoins(listRejectedMapJoins);
   }
 
-  private static int findGrandparentBranch(Operator <? extends Serializable> currOp, Operator <? extends Serializable> grandParent) {
+  private static int findGrandparentBranch(Operator<? extends Serializable> currOp,
+      Operator<? extends Serializable> grandParent) {
     int pos = -1;
     for (int i = 0; i < currOp.getParentOperators().size(); i++) {
       List<Operator<? extends Serializable>> parentOpList = new LinkedList<Operator<? extends Serializable>>();
@@ -592,14 +789,14 @@ public class MapJoinProcessor implements
       boolean found = false;
       while (!parentOpList.isEmpty()) {
         Operator<? extends Serializable> p = parentOpList.remove(0);
-        if(p == grandParent) {
+        if (p == grandParent) {
           found = true;
           break;
-        } else if (p.getParentOperators() != null){
+        } else if (p.getParentOperators() != null) {
           parentOpList.addAll(p.getParentOperators());
         }
       }
-      if(found) {
+      if (found) {
         pos = i;
         break;
       }
@@ -626,8 +823,7 @@ public class MapJoinProcessor implements
           .getListRejectedMapJoins();
 
       // the mapjoin has already been handled
-      if ((listRejectedMapJoins != null)
-          && (listRejectedMapJoins.contains(mapJoin))) {
+      if ((listRejectedMapJoins != null) && (listRejectedMapJoins.contains(mapJoin))) {
         return null;
       }
       addNoReducerMapJoinToCtx(ctx, mapJoin);
@@ -701,7 +897,8 @@ public class MapJoinProcessor implements
      * @param listMapJoinsNoRed
      * @param pGraphContext2
      */
-    public MapJoinWalkerCtx(List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinsNoRed, ParseContext pGraphContext) {
+    public MapJoinWalkerCtx(List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinsNoRed,
+        ParseContext pGraphContext) {
       this.listMapJoinsNoRed = listMapJoinsNoRed;
       currMapJoinOp = null;
       listRejectedMapJoins = new ArrayList<AbstractMapJoinOperator<? extends MapJoinDesc>>();
@@ -719,7 +916,8 @@ public class MapJoinProcessor implements
      * @param listMapJoinsNoRed
      *          the listMapJoins to set
      */
-    public void setListMapJoins(List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinsNoRed) {
+    public void setListMapJoins(
+        List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinsNoRed) {
       this.listMapJoinsNoRed = listMapJoinsNoRed;
     }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java Wed Nov 17 17:33:06 2010
@@ -205,7 +205,7 @@ public class ReduceSinkDeDuplication imp
         Operator<? extends Serializable> input = parentOp.get(0);
         input.getChildOperators().clear();
         
-        RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRR();
+        RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver();
 
         ArrayList<ExprNodeDesc> exprs = new ArrayList<ExprNodeDesc>();
         ArrayList<String> outputs = new ArrayList<String>();

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java?rev=1036128&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java Wed Nov 17 17:33:06 2010
@@ -0,0 +1,258 @@
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
+import org.apache.hadoop.hive.ql.lib.TaskGraphWalker.TaskGraphWalkerContext;
+import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin;
+import org.apache.hadoop.hive.ql.plan.ConditionalWork;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
+
+
+public class CommonJoinResolver implements PhysicalPlanResolver {
+  @Override
+  public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+
+    // create dispatcher and graph walker
+    Dispatcher disp = new CommonJoinTaskDispatcher(pctx);
+    TaskGraphWalker ogw = new TaskGraphWalker(disp);
+
+    // get all the tasks nodes from root task
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.rootTasks);
+
+    // begin to walk through the task tree.
+    ogw.startWalking(topNodes, null);
+    return pctx;
+  }
+
+
+  /**
+   * Iterator each tasks. If this task has a local work,create a new task for this local work, named
+   * MapredLocalTask. then make this new generated task depends on current task's parent task, and
+   * make current task depends on this new generated task
+   */
+  class CommonJoinTaskDispatcher implements Dispatcher {
+
+    private final PhysicalContext physicalContext;
+
+    public CommonJoinTaskDispatcher(PhysicalContext context) {
+      super();
+      physicalContext = context;
+    }
+
+    private ConditionalTask processCurrentTask(MapRedTask currTask, ConditionalTask conditionalTask)
+        throws SemanticException {
+
+      // whether it contains common join op; if contains, return this common join op
+      JoinOperator joinOp = getJoinOp(currTask);
+      if (joinOp == null) {
+        return null;
+      }
+      MapredWork currWork = currTask.getWork();
+      // create conditional work list and task list
+      List<Serializable> listWorks = new ArrayList<Serializable>();
+      List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
+
+      // create alias to task mapping and alias to input file mapping for resolver
+      HashMap<String, Task<? extends Serializable>> aliasToTask = new HashMap<String, Task<? extends Serializable>>();
+      HashMap<String, String> aliasToPath = new HashMap<String, String>();
+      HashMap<String, ArrayList<String>> pathToAliases = currTask.getWork().getPathToAliases();
+
+      // get parseCtx for this Join Operator
+      ParseContext parseCtx = physicalContext.getParseContext();
+      QBJoinTree joinTree = parseCtx.getJoinContext().get(joinOp);
+
+      // start to generate multiple map join tasks
+      JoinDesc joinDesc = joinOp.getConf();
+      Byte[] order = joinDesc.getTagOrder();
+      int numAliases = order.length;
+
+      try {
+        HashSet<Integer> smallTableOnlySet = MapJoinProcessor.getSmallTableOnlySet(joinDesc
+            .getConds());
+        // no table could be the big table; there is no need to convert
+        if (smallTableOnlySet == null) {
+          return null;
+        }
+        currWork.setOpParseCtxMap(parseCtx.getOpParseCtx());
+        currWork.setJoinTree(joinTree);
+
+        String xml = currWork.toXML();
+        String bigTableAlias = null;
+
+        if(smallTableOnlySet.size() == numAliases) {
+          return null;
+        }
+
+        for (int i = 0; i < numAliases; i++) {
+          // this table cannot be big table
+          if (smallTableOnlySet.contains(i)) {
+            continue;
+          }
+
+          // create map join task and set big table as i
+          // deep copy a new mapred work from xml
+          InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
+          MapredWork newWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf());
+          // create a mapred task for this work
+          MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext
+              .getParseContext().getConf());
+          JoinOperator newJoinOp = getJoinOp(newTask);
+
+          // optimize this newWork and assume big table position is i
+          bigTableAlias = MapJoinProcessor.genMapJoinOpAndLocalWork(newWork, newJoinOp, i);
+
+          // add into conditional task
+          listWorks.add(newWork);
+          listTasks.add(newTask);
+
+          //set up backup task
+          newTask.setBackupTask(currTask);
+          newTask.setBackupChildrenTasks(currTask.getChildTasks());
+
+          // put the mapping alias to task
+          aliasToTask.put(bigTableAlias, newTask);
+
+          // set alias to path
+          for (Map.Entry<String, ArrayList<String>> entry : pathToAliases.entrySet()) {
+            String path = entry.getKey();
+            ArrayList<String> aliasList = entry.getValue();
+            if (aliasList.contains(bigTableAlias)) {
+              aliasToPath.put(bigTableAlias, path);
+            }
+          }
+
+        }
+
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw new SemanticException("Generate Map Join Task Error: " + e.getMessage());
+      }
+
+      // insert current common join task to conditional task
+      listWorks.add(currTask.getWork());
+      listTasks.add(currTask);
+      // clear JoinTree and OP Parse Context
+      currWork.setOpParseCtxMap(null);
+      currWork.setJoinTree(null);
+
+      // create conditional task and insert conditional task into task tree
+      ConditionalWork cndWork = new ConditionalWork(listWorks);
+      ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, parseCtx.getConf());
+      cndTsk.setListTasks(listTasks);
+
+      // set resolver and resolver context
+      cndTsk.setResolver(new ConditionalResolverCommonJoin());
+      ConditionalResolverCommonJoinCtx resolverCtx = new ConditionalResolverCommonJoinCtx();
+      resolverCtx.setAliasToPath(aliasToPath);
+      resolverCtx.setAliasToTask(aliasToTask);
+      resolverCtx.setCommonJoinTask(currTask);
+      cndTsk.setResolverCtx(resolverCtx);
+
+      //replace the current task with the new generated conditional task
+      this.replaceTaskWithConditionalTask(currTask, cndTsk, physicalContext);
+      return cndTsk;
+    }
+
+    private void replaceTaskWithConditionalTask(Task<? extends Serializable> currTask, ConditionalTask cndTsk, PhysicalContext physicalContext) {
+      // add this task into task tree
+      // set all parent tasks
+      List<Task<? extends Serializable>> parentTasks = currTask.getParentTasks();
+      currTask.setParentTasks(null);
+      if (parentTasks != null) {
+        for (Task<? extends Serializable> tsk : parentTasks) {
+          // make new generated task depends on all the parent tasks of current task.
+          tsk.addDependentTask(cndTsk);
+          // remove the current task from its original parent task's dependent task
+          tsk.removeDependentTask(currTask);
+        }
+      } else {
+        // remove from current root task and add conditional task to root tasks
+        physicalContext.removeFromRootTask(currTask);
+        physicalContext.addToRootTask(cndTsk);
+      }
+      // set all child tasks
+      List<Task<? extends Serializable>> oldChildTasks = currTask.getChildTasks();
+      if (oldChildTasks != null) {
+        for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) {
+          if (tsk.equals(currTask)) {
+            continue;
+          }
+          for (Task<? extends Serializable> oldChild : oldChildTasks) {
+            tsk.addDependentTask(oldChild);
+          }
+        }
+      }
+    }
+
+
+    @Override
+    public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
+        throws SemanticException {
+      if (nodeOutputs == null || nodeOutputs.length == 0) {
+        throw new SemanticException("No Dispatch Context");
+      }
+
+      TaskGraphWalkerContext walkerCtx = (TaskGraphWalkerContext) nodeOutputs[0];
+
+      Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
+      // not map reduce task or not conditional task, just skip
+      if (currTask.isMapRedTask()) {
+        if (currTask instanceof ConditionalTask) {
+          // get the list of task
+          List<Task<? extends Serializable>> taskList = ((ConditionalTask) currTask).getListTasks();
+          for (Task<? extends Serializable> tsk : taskList) {
+            if (tsk.isMapRedTask()) {
+              ConditionalTask cndTask = this.processCurrentTask((MapRedTask) tsk,
+                  ((ConditionalTask) currTask));
+              walkerCtx.addToDispatchList(cndTask);
+            }
+          }
+        } else {
+          ConditionalTask cndTask = this.processCurrentTask((MapRedTask) currTask, null);
+          walkerCtx.addToDispatchList(cndTask);
+        }
+      }
+      return null;
+    }
+
+
+    private JoinOperator getJoinOp(MapRedTask task) throws SemanticException {
+      if (task.getWork() == null) {
+        return null;
+      }
+
+      Operator<? extends Serializable> reducerOp = task.getWork().getReducer();
+      if (reducerOp instanceof JoinOperator) {
+        return (JoinOperator) reducerOp;
+      } else {
+        return null;
+      }
+
+    }
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java Wed Nov 17 17:33:06 2010
@@ -273,7 +273,7 @@ public final class GenMRSkewJoinProcesso
       JoinOperator cloneJoinOp = (JoinOperator) reducer;
 
       MapJoinDesc mapJoinDescriptor = new MapJoinDesc(newJoinKeys, keyTblDesc,
-          newJoinValues, newJoinValueTblDesc, joinDescriptor
+          newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc,joinDescriptor
           .getOutputColumnNames(), i, joinDescriptor.getConds(),
           joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin());
       mapJoinDescriptor.setTagOrder(tags);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java Wed Nov 17 17:33:06 2010
@@ -42,11 +42,16 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx;
 
 /**
@@ -60,7 +65,7 @@ public class MapJoinResolver implements 
 
     //create dispatcher and graph walker
     Dispatcher disp = new LocalMapJoinTaskDispatcher(pctx);
-    GraphWalker ogw = new DefaultGraphWalker(disp);
+    TaskGraphWalker ogw = new TaskGraphWalker(disp);
 
     //get all the tasks nodes from root task
     ArrayList<Node> topNodes = new ArrayList<Node>();
@@ -106,6 +111,11 @@ public class MapJoinResolver implements 
         MapredLocalTask localTask = (MapredLocalTask) TaskFactory.get(localwork,
             physicalContext.getParseContext().getConf());
 
+        //set the backup task from curr task
+        localTask.setBackupTask(currTask.getBackupTask());
+        localTask.setBackupChildrenTasks(currTask.getBackupChildrenTasks());
+        currTask.setBackupChildrenTasks(null);
+        currTask.setBackupTask(null);
 
         //replace the map join operator to local_map_join operator in the operator tree
         //and return all the dummy parent
@@ -149,31 +159,64 @@ public class MapJoinResolver implements 
             listWork.set(index,(Serializable)localwork);
             conditionalWork.setListWorks(listWork);
 
-            //get bigKeysDirToTaskMap
-            ConditionalResolverSkewJoinCtx context  =
-              (ConditionalResolverSkewJoinCtx) conditionalTask.getResolverCtx();
-            HashMap<String, Task<? extends Serializable>> bigKeysDirToTaskMap =
-              context.getDirToTaskMap();
-
-            //to avoid concurrent modify the hashmap
-            HashMap<String, Task<? extends Serializable>> newbigKeysDirToTaskMap =
-              new HashMap<String, Task<? extends Serializable>>();
-
-
-            //reset the resolver
-            for(Map.Entry<String, Task<? extends Serializable>> entry: bigKeysDirToTaskMap.entrySet()){
-              Task<? extends Serializable> task = entry.getValue();
-              String key = entry.getKey();
-
-              if(task.equals(currTask)){
-                newbigKeysDirToTaskMap.put(key, localTask);
-              }else{
-                newbigKeysDirToTaskMap.put(key, task);
+            ConditionalResolver resolver = conditionalTask.getResolver();
+            if(resolver instanceof ConditionalResolverSkewJoin){
+              //get bigKeysDirToTaskMap
+              ConditionalResolverSkewJoinCtx context  =
+                (ConditionalResolverSkewJoinCtx) conditionalTask.getResolverCtx();
+              HashMap<String, Task<? extends Serializable>> bigKeysDirToTaskMap =
+                context.getDirToTaskMap();
+
+              //to avoid concurrent modify the hashmap
+              HashMap<String, Task<? extends Serializable>> newbigKeysDirToTaskMap =
+                new HashMap<String, Task<? extends Serializable>>();
+
+
+              //reset the resolver
+              for(Map.Entry<String, Task<? extends Serializable>> entry: bigKeysDirToTaskMap.entrySet()){
+                Task<? extends Serializable> task = entry.getValue();
+                String key = entry.getKey();
+                if(task.equals(currTask)){
+                  newbigKeysDirToTaskMap.put(key, localTask);
+                }else{
+                  newbigKeysDirToTaskMap.put(key, task);
+                }
               }
+
+              context.setDirToTaskMap(newbigKeysDirToTaskMap);
+              conditionalTask.setResolverCtx(context);
+
+            }else if(resolver instanceof ConditionalResolverCommonJoin){
+              //get bigKeysDirToTaskMap
+              ConditionalResolverCommonJoinCtx context  =
+                (ConditionalResolverCommonJoinCtx) conditionalTask.getResolverCtx();
+              HashMap<String, Task<? extends Serializable>> aliasToWork =
+                context.getAliasToTask();
+
+              //to avoid concurrent modify the hashmap
+              HashMap<String, Task<? extends Serializable>> newAliasToWork =
+                new HashMap<String, Task<? extends Serializable>>();
+
+              //reset the resolver
+              for(Map.Entry<String, Task<? extends Serializable>> entry: aliasToWork.entrySet()){
+                Task<? extends Serializable> task = entry.getValue();
+                String key = entry.getKey();
+
+                if(task.equals(currTask)){
+                  newAliasToWork.put(key, localTask);
+                }else{
+                  newAliasToWork.put(key, task);
+                }
+              }
+
+              context.setAliasToTask(newAliasToWork);
+              conditionalTask.setResolverCtx(context);
+
+            }else{
+
             }
 
-            context.setDirToTaskMap(newbigKeysDirToTaskMap);
-            conditionalTask.setResolverCtx(context);
+
           }
         }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java Wed Nov 17 17:33:06 2010
@@ -49,6 +49,9 @@ public class PhysicalOptimizer {
     if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN)) {
       resolvers.add(new SkewJoinResolver());
     }
+    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
+      resolvers.add(new CommonJoinResolver());
+    }
     resolvers.add(new MapJoinResolver());
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java Wed Nov 17 17:33:06 2010
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 
 import org.antlr.runtime.Token;
@@ -26,9 +27,10 @@ import org.apache.hadoop.hive.ql.lib.Nod
 
 /**
  * @author athusoo
- * 
+ *
  */
-public class ASTNode extends CommonTree implements Node {
+public class ASTNode extends CommonTree implements Node,Serializable {
+  private static final long serialVersionUID = 1L;
 
   private ASTNodeOrigin origin;
 
@@ -37,7 +39,7 @@ public class ASTNode extends CommonTree 
 
   /**
    * Constructor.
-   * 
+   *
    * @param t
    *          Token for the CommonTree Node
    */
@@ -47,7 +49,7 @@ public class ASTNode extends CommonTree 
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see org.apache.hadoop.hive.ql.lib.Node#getChildren()
    */
   public ArrayList<Node> getChildren() {
@@ -65,7 +67,7 @@ public class ASTNode extends CommonTree 
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see org.apache.hadoop.hive.ql.lib.Node#getName()
    */
   public String getName() {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OpParseContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OpParseContext.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OpParseContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/OpParseContext.java Wed Nov 17 17:33:06 2010
@@ -17,14 +17,15 @@
  */
 
 package org.apache.hadoop.hive.ql.parse;
-
+import java.io.Serializable;
 /**
  * Implementation of the Operator Parse Context. It maintains the parse context
  * that may be needed by an operator. Currently, it only maintains the row
  * resolver.
  **/
 
-public class OpParseContext {
+public class OpParseContext implements Serializable {
+  private static final long serialVersionUID = 1L;
   private RowResolver rr; // row resolver for the operator
 
   public OpParseContext() {
@@ -41,7 +42,7 @@ public class OpParseContext {
   /**
    * @return the row resolver
    */
-  public RowResolver getRR() {
+  public RowResolver getRowResolver() {
     return rr;
   }
 
@@ -49,7 +50,7 @@ public class OpParseContext {
    * @param rr
    *          the row resolver to set
    */
-  public void setRR(RowResolver rr) {
+  public void setRowResolver(RowResolver rr) {
     this.rr = rr;
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBJoinTree.java Wed Nov 17 17:33:06 2010
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -25,9 +26,10 @@ import java.util.Map.Entry;
 
 /**
  * Internal representation of the join tree.
- * 
+ *
  */
-public class QBJoinTree {
+public class QBJoinTree implements Serializable{
+  private static final long serialVersionUID = 1L;
   private String leftAlias;
   private String[] rightAliases;
   private String[] leftAliases;
@@ -70,7 +72,7 @@ public class QBJoinTree {
 
   /**
    * returns left alias if any - this is used for merging later on.
-   * 
+   *
    * @return left alias if any
    */
   public String getLeftAlias() {
@@ -79,7 +81,7 @@ public class QBJoinTree {
 
   /**
    * set left alias for the join expression.
-   * 
+   *
    * @param leftAlias
    *          String
    */
@@ -229,7 +231,7 @@ public class QBJoinTree {
 
   /**
    * Insert only a key to the semijoin table name to column names map.
-   * 
+   *
    * @param alias
    *          table name alias.
    */
@@ -241,7 +243,7 @@ public class QBJoinTree {
 
   /**
    * Remeber the mapping of table alias to set of columns.
-   * 
+   *
    * @param alias
    * @param columns
    */
@@ -256,7 +258,7 @@ public class QBJoinTree {
 
   /**
    * Remeber the mapping of table alias to set of columns.
-   * 
+   *
    * @param alias
    * @param columns
    */
@@ -277,7 +279,7 @@ public class QBJoinTree {
 
   /**
    * Merge the rhs tables from another join tree.
-   * 
+   *
    * @param src
    *          the source join tree
    */

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java Wed Nov 17 17:33:06 2010
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -31,15 +32,15 @@ import org.apache.hadoop.hive.ql.exec.Ro
 
 /**
  * Implementation of the Row Resolver.
- * 
+ *
  */
-public class RowResolver {
+public class RowResolver implements Serializable{
+  private static final long serialVersionUID = 1L;
+  private  RowSchema rowSchema;
+  private  HashMap<String, LinkedHashMap<String, ColumnInfo>> rslvMap;
 
-  private final RowSchema rowSchema;
-  private final HashMap<String, LinkedHashMap<String, ColumnInfo>> rslvMap;
-
-  private final HashMap<String, String[]> invRslvMap;
-  private final Map<String, ASTNode> expressionMap;
+  private  HashMap<String, String[]> invRslvMap;
+  private  Map<String, ASTNode> expressionMap;
 
   // TODO: Refactor this and do in a more object oriented manner
   private boolean isExprResolver;
@@ -119,11 +120,11 @@ public class RowResolver {
    * row resolver and returns the match. It also throws an exception if the
    * column is found in multiple table aliases. If no match is found a null
    * values is returned.
-   * 
+   *
    * This allows us to interpret both select t.c1 type of references and select
    * c1 kind of refereneces. The later kind are what we call non aliased column
    * references in the query.
-   * 
+   *
    * @param tab_alias
    *          The table alias to match (this is null if the column reference is
    *          non aliased)
@@ -222,4 +223,42 @@ public class RowResolver {
     }
     return sb.toString();
   }
+
+  public RowSchema getRowSchema() {
+    return rowSchema;
+  }
+
+  public HashMap<String, LinkedHashMap<String, ColumnInfo>> getRslvMap() {
+    return rslvMap;
+  }
+
+  public HashMap<String, String[]> getInvRslvMap() {
+    return invRslvMap;
+  }
+
+  public Map<String, ASTNode> getExpressionMap() {
+    return expressionMap;
+  }
+
+  public void setExprResolver(boolean isExprResolver) {
+    this.isExprResolver = isExprResolver;
+  }
+
+
+  public void setRowSchema(RowSchema rowSchema) {
+    this.rowSchema = rowSchema;
+  }
+
+  public void setRslvMap(HashMap<String, LinkedHashMap<String, ColumnInfo>> rslvMap) {
+    this.rslvMap = rslvMap;
+  }
+
+  public void setInvRslvMap(HashMap<String, String[]> invRslvMap) {
+    this.invRslvMap = invRslvMap;
+  }
+
+  public void setExpressionMap(Map<String, ASTNode> expressionMap) {
+    this.expressionMap = expressionMap;
+  }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Nov 17 17:33:06 2010
@@ -1297,7 +1297,7 @@ public class SemanticAnalyzer extends Ba
       throws SemanticException {
 
     OpParseContext inputCtx = opParseCtx.get(input);
-    RowResolver inputRR = inputCtx.getRR();
+    RowResolver inputRR = inputCtx.getRowResolver();
     Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
         new FilterDesc(genExprNodeDesc(condn, inputRR), false), new RowSchema(
         inputRR.getColumnInfos()), input), inputRR);
@@ -1599,7 +1599,7 @@ public class SemanticAnalyzer extends Ba
 
     StringBuilder inpColumns = new StringBuilder();
     StringBuilder inpColumnTypes = new StringBuilder();
-    ArrayList<ColumnInfo> inputSchema = opParseCtx.get(input).getRR()
+    ArrayList<ColumnInfo> inputSchema = opParseCtx.get(input).getRowResolver()
         .getColumnInfos();
     for (int i = 0; i < inputSchema.size(); ++i) {
       if (i != 0) {
@@ -1838,7 +1838,7 @@ public class SemanticAnalyzer extends Ba
     ASTNode trfm = null;
     String alias = qb.getParseInfo().getAlias();
     Integer pos = Integer.valueOf(0);
-    RowResolver inputRR = opParseCtx.get(input).getRR();
+    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     // SELECT * or SELECT TRANSFORM(*)
     boolean selectStar = false;
     int posn = 0;
@@ -2178,7 +2178,7 @@ public class SemanticAnalyzer extends Ba
       Map<String, GenericUDAFEvaluator> genericUDAFEvaluators)
       throws SemanticException {
     RowResolver groupByInputRowResolver = opParseCtx
-        .get(reduceSinkOperatorInfo).getRR();
+        .get(reduceSinkOperatorInfo).getRowResolver();
     RowResolver groupByOutputRowResolver = new RowResolver();
     groupByOutputRowResolver.setIsExprResolver(true);
     ArrayList<ExprNodeDesc> groupByKeys = new ArrayList<ExprNodeDesc>();
@@ -2303,7 +2303,7 @@ public class SemanticAnalyzer extends Ba
       boolean distPartAgg) throws SemanticException {
     ArrayList<String> outputColumnNames = new ArrayList<String>();
     RowResolver groupByInputRowResolver = opParseCtx
-        .get(reduceSinkOperatorInfo).getRR();
+        .get(reduceSinkOperatorInfo).getRowResolver();
     RowResolver groupByOutputRowResolver = new RowResolver();
     groupByOutputRowResolver.setIsExprResolver(true);
     ArrayList<ExprNodeDesc> groupByKeys = new ArrayList<ExprNodeDesc>();
@@ -2452,7 +2452,7 @@ public class SemanticAnalyzer extends Ba
       throws SemanticException {
 
     RowResolver groupByInputRowResolver = opParseCtx.get(inputOperatorInfo)
-        .getRR();
+        .getRowResolver();
     QBParseInfo parseInfo = qb.getParseInfo();
     RowResolver groupByOutputRowResolver = new RowResolver();
     groupByOutputRowResolver.setIsExprResolver(true);
@@ -2569,7 +2569,7 @@ public class SemanticAnalyzer extends Ba
       boolean mapAggrDone) throws SemanticException {
 
     RowResolver reduceSinkInputRowResolver = opParseCtx.get(inputOperatorInfo)
-        .getRR();
+        .getRowResolver();
     QBParseInfo parseInfo = qb.getParseInfo();
     RowResolver reduceSinkOutputRowResolver = new RowResolver();
     reduceSinkOutputRowResolver.setIsExprResolver(true);
@@ -2713,7 +2713,7 @@ public class SemanticAnalyzer extends Ba
       String dest, Operator groupByOperatorInfo, int numPartitionFields,
       int numReducers) throws SemanticException {
     RowResolver reduceSinkInputRowResolver2 = opParseCtx.get(
-        groupByOperatorInfo).getRR();
+        groupByOperatorInfo).getRowResolver();
     RowResolver reduceSinkOutputRowResolver2 = new RowResolver();
     reduceSinkOutputRowResolver2.setIsExprResolver(true);
     Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
@@ -2784,7 +2784,7 @@ public class SemanticAnalyzer extends Ba
       Map<String, GenericUDAFEvaluator> genericUDAFEvaluators)
       throws SemanticException {
     RowResolver groupByInputRowResolver2 = opParseCtx.get(
-        reduceSinkOperatorInfo2).getRR();
+        reduceSinkOperatorInfo2).getRowResolver();
     RowResolver groupByOutputRowResolver2 = new RowResolver();
     groupByOutputRowResolver2.setIsExprResolver(true);
     ArrayList<ExprNodeDesc> groupByKeys = new ArrayList<ExprNodeDesc>();
@@ -3098,7 +3098,7 @@ public class SemanticAnalyzer extends Ba
         genericUDAFEvaluators);
 
     groupOpToInputTables.put(groupByOperatorInfo, opParseCtx.get(
-        inputOperatorInfo).getRR().getTableNames());
+        inputOperatorInfo).getRowResolver().getTableNames());
     int numReducers = -1;
 
     // Optimize the scenario when there are no grouping keys - only 1 reducer is
@@ -3169,7 +3169,7 @@ public class SemanticAnalyzer extends Ba
         genericUDAFEvaluators);
 
     groupOpToInputTables.put(groupByOperatorInfo, opParseCtx.get(
-        inputOperatorInfo).getRR().getTableNames());
+        inputOperatorInfo).getRowResolver().getTableNames());
     // Optimize the scenario when there are no grouping keys and no distinct - 2
     // map-reduce jobs are not needed
     // For eg: select count(1) from T where t.ds = ....
@@ -3405,7 +3405,7 @@ public class SemanticAnalyzer extends Ba
   private Operator genFileSinkPlan(String dest, QB qb, Operator input)
       throws SemanticException {
 
-    RowResolver inputRR = opParseCtx.get(input).getRR();
+    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     QBMetaData qbm = qb.getMetaData();
     Integer dest_type = qbm.getDestTypeForAlias(dest);
 
@@ -3705,7 +3705,7 @@ public class SemanticAnalyzer extends Ba
     }
 
     input = genConversionSelectOperator(dest, qb, input, table_desc, dpCtx);
-    inputRR = opParseCtx.get(input).getRR();
+    inputRR = opParseCtx.get(input).getRowResolver();
 
     ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>();
 
@@ -3789,7 +3789,7 @@ public class SemanticAnalyzer extends Ba
     // Check column number
     List<? extends StructField> tableFields = oi.getAllStructFieldRefs();
     boolean dynPart = HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING);
-    ArrayList<ColumnInfo> rowFields = opParseCtx.get(input).getRR()
+    ArrayList<ColumnInfo> rowFields = opParseCtx.get(input).getRowResolver()
         .getColumnInfos();
     int inColumnCnt = rowFields.size();
     int outColumnCnt = tableFields.size();
@@ -3899,7 +3899,7 @@ public class SemanticAnalyzer extends Ba
     // write into a local file and then have a map-only job.
     // Add the limit operator to get the value fields
 
-    RowResolver inputRR = opParseCtx.get(input).getRR();
+    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     Operator limitMap = putOpInsertMap(OperatorFactory.getAndMakeChild(
         new LimitDesc(limit), new RowSchema(inputRR.getColumnInfos()), input),
         inputRR);
@@ -3944,7 +3944,7 @@ public class SemanticAnalyzer extends Ba
 
     // resulting output object inspector can be used to make the RowResolver
     // for the UDTF operator
-    RowResolver selectRR = opParseCtx.get(input).getRR();
+    RowResolver selectRR = opParseCtx.get(input).getRowResolver();
     ArrayList<ColumnInfo> inputCols = selectRR.getColumnInfos();
 
     // Create the object inspector for the input columns and initialize the UDTF
@@ -4023,7 +4023,7 @@ public class SemanticAnalyzer extends Ba
   private ArrayList<ExprNodeDesc> getParitionColsFromBucketCols(String dest, QB qb, Table tab,
                                                                 TableDesc table_desc, Operator input, boolean convert)
     throws SemanticException {
-    RowResolver inputRR = opParseCtx.get(input).getRR();
+    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     List<String> tabBucketCols = tab.getBucketCols();
     List<FieldSchema> tabCols  = tab.getCols();
 
@@ -4057,7 +4057,7 @@ public class SemanticAnalyzer extends Ba
     }
 
     List<? extends StructField> tableFields = oi.getAllStructFieldRefs();
-    ArrayList<ColumnInfo> rowFields = opParseCtx.get(input).getRR()
+    ArrayList<ColumnInfo> rowFields = opParseCtx.get(input).getRowResolver()
         .getColumnInfos();
 
     // Check column type
@@ -4095,7 +4095,7 @@ public class SemanticAnalyzer extends Ba
 
   private ArrayList<ExprNodeDesc> getSortCols(String dest, QB qb, Table tab, TableDesc table_desc, Operator input, boolean convert)
     throws SemanticException {
-    RowResolver inputRR = opParseCtx.get(input).getRR();
+    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     List<Order> tabSortCols = tab.getSortCols();
     List<FieldSchema> tabCols  = tab.getCols();
 
@@ -4122,7 +4122,7 @@ public class SemanticAnalyzer extends Ba
                                                         ArrayList<ExprNodeDesc> partitionCols,
                                                         int numReducers)
     throws SemanticException {
-    RowResolver inputRR = opParseCtx.get(input).getRR();
+    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
 
     // For the generation of the values expression just get the inputs
     // signature and generate field expressions for those
@@ -4181,7 +4181,7 @@ public class SemanticAnalyzer extends Ba
   private Operator genReduceSinkPlan(String dest, QB qb, Operator input,
       int numReducers) throws SemanticException {
 
-    RowResolver inputRR = opParseCtx.get(input).getRR();
+    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
 
     // First generate the expression for the partition and sort keys
     // The cluster by clause / distribute by clause has the aliases for
@@ -4320,7 +4320,7 @@ public class SemanticAnalyzer extends Ba
       // check whether this input operator produces output
       if (omitOpts == null || !omitOpts.contains(pos)) {
         // prepare output descriptors for the input opt
-        RowResolver inputRS = opParseCtx.get(input).getRR();
+        RowResolver inputRS = opParseCtx.get(input).getRowResolver();
         Iterator<String> keysIter = inputRS.getTableNames().iterator();
         Set<String> aliases = posToAliasMap.get(pos);
         if (aliases == null) {
@@ -4379,7 +4379,7 @@ public class SemanticAnalyzer extends Ba
   @SuppressWarnings("nls")
   private Operator genJoinReduceSinkChild(QB qb, QBJoinTree joinTree,
       Operator child, String srcName, int pos) throws SemanticException {
-    RowResolver inputRS = opParseCtx.get(child).getRR();
+    RowResolver inputRS = opParseCtx.get(child).getRowResolver();
     RowResolver outputRS = new RowResolver();
     ArrayList<String> outputColumns = new ArrayList<String>();
     ArrayList<ExprNodeDesc> reduceKeys = new ArrayList<ExprNodeDesc>();
@@ -4515,7 +4515,7 @@ public class SemanticAnalyzer extends Ba
   private Operator insertSelectForSemijoin(ArrayList<ASTNode> fields,
       Operator input) throws SemanticException {
 
-    RowResolver inputRR = opParseCtx.get(input).getRR();
+    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     ArrayList<ExprNodeDesc> colList = new ArrayList<ExprNodeDesc>();
     ArrayList<String> columnNames = new ArrayList<String>();
 
@@ -4547,7 +4547,7 @@ public class SemanticAnalyzer extends Ba
       throws SemanticException {
 
     RowResolver groupByInputRowResolver = opParseCtx.get(inputOperatorInfo)
-        .getRR();
+        .getRowResolver();
     RowResolver groupByOutputRowResolver = new RowResolver();
     ArrayList<ExprNodeDesc> groupByKeys = new ArrayList<ExprNodeDesc>();
     ArrayList<String> outputColumnNames = new ArrayList<String>();
@@ -5121,7 +5121,7 @@ public class SemanticAnalyzer extends Ba
   private Operator insertSelectAllPlanForGroupBy(String dest, Operator input)
       throws SemanticException {
     OpParseContext inputCtx = opParseCtx.get(input);
-    RowResolver inputRR = inputCtx.getRR();
+    RowResolver inputRR = inputCtx.getRowResolver();
     ArrayList<ColumnInfo> columns = inputRR.getColumnInfos();
     ArrayList<ExprNodeDesc> colList = new ArrayList<ExprNodeDesc>();
     ArrayList<String> columnNames = new ArrayList<String>();
@@ -5141,7 +5141,7 @@ public class SemanticAnalyzer extends Ba
   // Return the common distinct expression
   // There should be more than 1 destination, with group bys in all of them.
   private List<ASTNode> getCommonDistinctExprs(QB qb, Operator input) {
-    RowResolver inputRR = opParseCtx.get(input).getRR();
+    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     QBParseInfo qbp = qb.getParseInfo();
 
     TreeSet<String> ks = new TreeSet<String>();
@@ -5214,7 +5214,7 @@ public class SemanticAnalyzer extends Ba
     ks.addAll(qbp.getClauseNames());
 
     // Pass the entire row
-    RowResolver inputRR = opParseCtx.get(input).getRR();
+    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     RowResolver reduceSinkOutputRowResolver = new RowResolver();
     reduceSinkOutputRowResolver.setIsExprResolver(true);
     ArrayList<ExprNodeDesc> reduceKeys = new ArrayList<ExprNodeDesc>();
@@ -5318,7 +5318,7 @@ public class SemanticAnalyzer extends Ba
     if (optimizeMultiGroupBy) {
       curr = createCommonReduceSink(qb, input);
 
-      RowResolver currRR = opParseCtx.get(curr).getRR();
+      RowResolver currRR = opParseCtx.get(curr).getRowResolver();
       // create a forward operator
       input = putOpInsertMap(OperatorFactory.getAndMakeChild(new ForwardDesc(),
           new RowSchema(currRR.getColumnInfos()), curr), currRR);
@@ -5417,7 +5417,7 @@ public class SemanticAnalyzer extends Ba
         // change curr ops row resolver's tab aliases to query alias if it
         // exists
         if (qb.getParseInfo().getAlias() != null) {
-          RowResolver rr = opParseCtx.get(curr).getRR();
+          RowResolver rr = opParseCtx.get(curr).getRowResolver();
           RowResolver newRR = new RowResolver();
           String alias = qb.getParseInfo().getAlias();
           for (ColumnInfo colInfo : rr.getColumnInfos()) {
@@ -5425,7 +5425,7 @@ public class SemanticAnalyzer extends Ba
             String[] tmp = rr.reverseLookup(name);
             newRR.put(alias, tmp[1], colInfo);
           }
-          opParseCtx.get(curr).setRR(newRR);
+          opParseCtx.get(curr).setRowResolver(newRR);
         }
       }
     }
@@ -5445,8 +5445,8 @@ public class SemanticAnalyzer extends Ba
     // Currently, the unions are not merged - each union has only 2 parents. So,
     // a n-way union will lead to (n-1) union operators.
     // This can be easily merged into 1 union
-    RowResolver leftRR = opParseCtx.get(leftOp).getRR();
-    RowResolver rightRR = opParseCtx.get(rightOp).getRR();
+    RowResolver leftRR = opParseCtx.get(leftOp).getRowResolver();
+    RowResolver rightRR = opParseCtx.get(rightOp).getRowResolver();
     HashMap<String, ColumnInfo> leftmap = leftRR.getFieldMap(leftalias);
     HashMap<String, ColumnInfo> rightmap = rightRR.getFieldMap(rightalias);
     // make sure the schemas of both sides are the same
@@ -5698,7 +5698,7 @@ public class SemanticAnalyzer extends Ba
       // Add a mapping from the table scan operator to Table
       topToTable.put((TableScanOperator) top, tab);
     } else {
-      rwsch = opParseCtx.get(top).getRR();
+      rwsch = opParseCtx.get(top).getRowResolver();
       top.setChildOperators(null);
     }
 
@@ -5989,7 +5989,7 @@ public class SemanticAnalyzer extends Ba
           //
 
           RowResolver lvForwardRR = new RowResolver();
-          RowResolver source = opParseCtx.get(op).getRR();
+          RowResolver source = opParseCtx.get(op).getRowResolver();
           for (ColumnInfo col : source.getColumnInfos()) {
             if(col.getIsVirtualCol() && col.isHiddenVirtualCol()) {
               continue;
@@ -6007,7 +6007,7 @@ public class SemanticAnalyzer extends Ba
           // give it the row first.
 
           // Get the all path by making a select(*).
-          RowResolver allPathRR = opParseCtx.get(lvForward).getRR();
+          RowResolver allPathRR = opParseCtx.get(lvForward).getRowResolver();
           //Operator allPath = op;
           Operator allPath = putOpInsertMap(OperatorFactory.getAndMakeChild(
                             new SelectDesc(true), new RowSchema(allPathRR.getColumnInfos()),
@@ -6020,7 +6020,7 @@ public class SemanticAnalyzer extends Ba
           for (String udtfAlias : blankQb.getAliases()) {
             qb.addAlias(udtfAlias);
           }
-          RowResolver udtfPathRR = opParseCtx.get(udtfPath).getRR();
+          RowResolver udtfPathRR = opParseCtx.get(udtfPath).getRowResolver();
 
           // Merge the two into the lateral view join
           // The cols of the merged result will be the combination of both the
@@ -6522,7 +6522,7 @@ public class SemanticAnalyzer extends Ba
     // up with later.
     Operator sinkOp = genPlan(qb);
     resultSchema =
-        convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRR());
+        convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver());
 
     if (createVwDesc != null) {
       saveViewDefinition();
@@ -6850,7 +6850,7 @@ public class SemanticAnalyzer extends Ba
    * Get the row resolver given an operator.
    */
   public RowResolver getRowResolver(Operator opt) {
-    return opParseCtx.get(opt).getRR();
+    return opParseCtx.get(opt).getRowResolver();
   }
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java?rev=1036128&r1=1036127&r2=1036128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolver.java Wed Nov 17 17:33:06 2010
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.exec.Ta
 public interface ConditionalResolver {
   /**
    * All conditional resolvers implement this interface.
-   * 
+   *
    * @param conf
    *          configuration
    * @param ctx
@@ -39,4 +39,5 @@ public interface ConditionalResolver {
    * @return position of the task
    */
   List<Task<? extends Serializable>> getTasks(HiveConf conf, Object ctx);
+
 }



Mime
View raw message