hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r919252 [1/6] - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ data/files/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/...
Date Fri, 05 Mar 2010 00:53:44 GMT
Author: namit
Date: Fri Mar  5 00:53:43 2010
New Revision: 919252

URL: http://svn.apache.org/viewvc?rev=919252&view=rev
Log:
HIVE-1194. Add sort merge join
(He Yongqiang via namit)


Added:
    hadoop/hive/trunk/data/files/smbbucket_1.txt
    hadoop/hive/trunk/data/files/smbbucket_2.txt
    hadoop/hive/trunk/data/files/smbbucket_3.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedMergeBucketMapJoinOptimizer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/SMBJoinDesc.java
    hadoop/hive/trunk/ql/src/test/queries/clientnegative/smb_bucketmapjoin.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_1.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_2.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_3.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_4.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_5.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_6.q
    hadoop/hive/trunk/ql/src/test/results/clientnegative/smb_bucketmapjoin.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_4.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_5.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_6.q.out
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=919252&r1=919251&r2=919252&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Fri Mar  5 00:53:43 2010
@@ -45,6 +45,9 @@
     HIVE-1197. Add BucketizedHiveInputFormat
     (Siying Dong via namit)
 
+    HIVE-1194. Add sort merge join
+    (He Yongqiang via namit)
+
   IMPROVEMENTS
     HIVE-983. Function from_unixtime takes long.
     (Ning Zhang via zshao)

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=919252&r1=919251&r2=919252&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Fri Mar  5 00:53:43 2010
@@ -200,6 +200,7 @@
     HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown
     HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by
     HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join
+    HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
     ;
 
     public final String varname;

Added: hadoop/hive/trunk/data/files/smbbucket_1.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/data/files/smbbucket_1.txt?rev=919252&view=auto
==============================================================================
--- hadoop/hive/trunk/data/files/smbbucket_1.txt (added)
+++ hadoop/hive/trunk/data/files/smbbucket_1.txt Fri Mar  5 00:53:43 2010
@@ -0,0 +1,5 @@
+1val_1
+3val_3
+4val_4
+5val_5
+10val_10

Added: hadoop/hive/trunk/data/files/smbbucket_2.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/data/files/smbbucket_2.txt?rev=919252&view=auto
==============================================================================
--- hadoop/hive/trunk/data/files/smbbucket_2.txt (added)
+++ hadoop/hive/trunk/data/files/smbbucket_2.txt Fri Mar  5 00:53:43 2010
@@ -0,0 +1,4 @@
+20val_20
+23val_23
+25val_25
+30val_30

Added: hadoop/hive/trunk/data/files/smbbucket_3.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/data/files/smbbucket_3.txt?rev=919252&view=auto
==============================================================================
--- hadoop/hive/trunk/data/files/smbbucket_3.txt (added)
+++ hadoop/hive/trunk/data/files/smbbucket_3.txt Fri Mar  5 00:53:43 2010
@@ -0,0 +1,6 @@
+4val_4
+10val_10
+17val_17
+19val_19
+20val_20
+23val_23

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=919252&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Fri Mar  5 00:53:43 2010
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx;
+import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.util.ReflectionUtils;
+
+
+public abstract class AbstractMapJoinOperator <T extends MapJoinDesc> extends CommonJoinOperator<T> implements
+    Serializable {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The expressions for join inputs's join keys.
+   */
+  protected transient Map<Byte, List<ExprNodeEvaluator>> joinKeys;
+  /**
+   * The ObjectInspectors for the join inputs's join keys.
+   */
+  protected transient Map<Byte, List<ObjectInspector>> joinKeysObjectInspectors;
+  /**
+   * The standard ObjectInspectors for the join inputs's join keys.
+   */
+  protected transient Map<Byte, List<ObjectInspector>> joinKeysStandardObjectInspectors;
+
+  protected transient int posBigTable = -1; // one of the tables that is not in memory
+  transient int mapJoinRowsKey; // rows for a given key
+
+  protected transient RowContainer<ArrayList<Object>> emptyList = null;
+  
+  transient int numMapRowsRead;
+
+  private static final transient String[] FATAL_ERR_MSG = {
+      null, // counter value 0 means no error
+      "Mapside join size exceeds hive.mapjoin.maxsize. "
+          + "Please increase that or remove the mapjoin hint."
+      };
+
+  transient boolean firstRow;
+  transient int heartbeatInterval;
+  
+  public AbstractMapJoinOperator() {
+  }
+
+  public AbstractMapJoinOperator(AbstractMapJoinOperator<? extends MapJoinDesc> mjop) {
+    super((CommonJoinOperator)mjop);
+  }
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
+
+    numMapRowsRead = 0;
+    firstRow = true;
+    heartbeatInterval = HiveConf.getIntVar(hconf,
+        HiveConf.ConfVars.HIVESENDHEARTBEAT);
+
+    joinKeys = new HashMap<Byte, List<ExprNodeEvaluator>>();
+
+    populateJoinKeyValue(joinKeys, conf.getKeys());
+    joinKeysObjectInspectors = getObjectInspectorsFromEvaluators(joinKeys,
+        inputObjInspectors);
+    joinKeysStandardObjectInspectors = getStandardObjectInspectors(joinKeysObjectInspectors);
+
+    // all other tables are small, and are cached in the hash table
+    posBigTable = conf.getPosBigTable();
+
+    emptyList = new RowContainer<ArrayList<Object>>(1, hconf);
+    RowContainer bigPosRC = getRowContainer(hconf, (byte) posBigTable,
+        order[posBigTable], joinCacheSize);
+    storage.put((byte) posBigTable, bigPosRC);
+
+    mapJoinRowsKey = HiveConf.getIntVar(hconf,
+        HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
+
+    List<? extends StructField> structFields = ((StructObjectInspector) outputObjInspector)
+        .getAllStructFieldRefs();
+    if (conf.getOutputColumnNames().size() < structFields.size()) {
+      List<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>();
+      for (Byte alias : order) {
+        int sz = conf.getExprs().get(alias).size();
+        List<Integer> retained = conf.getRetainList().get(alias);
+        for (int i = 0; i < sz; i++) {
+          int pos = retained.get(i);
+          structFieldObjectInspectors.add(structFields.get(pos)
+              .getFieldObjectInspector());
+        }
+      }
+      outputObjInspector = ObjectInspectorFactory
+          .getStandardStructObjectInspector(conf.getOutputColumnNames(),
+          structFieldObjectInspectors);
+    }
+    initializeChildren(hconf);
+  }
+
+  @Override
+  protected void fatalErrorMessage(StringBuilder errMsg, long counterCode) {
+    errMsg.append("Operator " + getOperatorId() + " (id=" + id + "): "
+        + FATAL_ERR_MSG[(int) counterCode]);
+  }
+  
+  protected void reportProgress() {
+    // Send some status periodically
+    numMapRowsRead++;
+    if (((numMapRowsRead % heartbeatInterval) == 0) && (reporter != null)) {
+      reporter.progress();
+    }
+  }
+
+  @Override
+  public int getType() {
+    return OperatorType.MAPJOIN;
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=919252&r1=919251&r2=919252&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Fri Mar  5 00:53:43 2010
@@ -110,7 +110,7 @@
   // be output
   protected transient JoinCondDesc[] condn;
   protected transient boolean noOuterJoin;
-  private transient Object[] dummyObj; // for outer joins, contains the
+  protected transient Object[] dummyObj; // for outer joins, contains the
   // potential nulls for the concerned
   // aliases
   protected transient RowContainer<ArrayList<Object>>[] dummyObjVectors; // empty
@@ -140,6 +140,48 @@
 
   transient boolean handleSkewJoin = false;
 
+  public CommonJoinOperator() {
+  }
+  
+  public CommonJoinOperator(CommonJoinOperator<T> clone) {
+    this.joinEmitInterval = clone.joinEmitInterval;
+    this.joinCacheSize = clone.joinCacheSize;
+    this.nextSz = clone.nextSz;
+    this.childOperators = clone.childOperators;
+    this.parentOperators = clone.parentOperators;
+    this.counterNames = clone.counterNames;
+    this.counterNameToEnum = clone.counterNameToEnum;
+    this.done = clone.done;
+    this.operatorId = clone.operatorId;
+    this.storage = clone.storage;
+    this.condn = clone.condn;
+    
+    this.setSchema(clone.getSchema());
+    
+    this.alias = clone.alias;
+    this.beginTime = clone.beginTime;
+    this.inputRows = clone.inputRows;
+    this.childOperatorsArray = clone.childOperatorsArray;
+    this.childOperatorsTag = clone.childOperatorsTag;
+    this.colExprMap = clone.colExprMap;
+    this.counters = clone.counters;
+    this.dummyObj = clone.dummyObj;
+    this.dummyObjVectors = clone.dummyObjVectors;
+    this.forwardCache = clone.forwardCache;
+    this.groupKeyObject = clone.groupKeyObject;
+    this.handleSkewJoin = clone.handleSkewJoin;
+    this.hconf = clone.hconf;
+    this.id = clone.id;
+    this.inputObjInspectors = clone.inputObjInspectors;
+    this.inputRows = clone.inputRows;
+    this.noOuterJoin = clone.noOuterJoin;
+    this.numAliases = clone.numAliases;
+    this.operatorId = clone.operatorId;
+    this.posToAliasMap = clone.posToAliasMap;
+    this.spillTableDesc = clone.spillTableDesc;
+    this.statsMap = clone.statsMap;
+  }
+
   protected int populateJoinKeyValue(Map<Byte, List<ExprNodeEvaluator>> outMap,
       Map<Byte, List<ExprNodeDesc>> inputMap) {
 
@@ -224,8 +266,6 @@
   protected void initializeOp(Configuration hconf) throws HiveException {
     this.handleSkewJoin = conf.getHandleSkewJoin();
     this.hconf = hconf;
-    LOG.info("COMMONJOIN "
-        + ((StructObjectInspector) inputObjInspectors[0]).getTypeName());
     totalSz = 0;
     // Map that contains the rows for each alias
     storage = new HashMap<Byte, RowContainer<ArrayList<Object>>>();
@@ -699,7 +739,7 @@
    * maintained (inputNulls) where each entry denotes whether the element is to
    * be used or not (whether it is null or not). The size of the bitvector is
    * same as the number of inputs under consideration currently. When all inputs
-   * are accounted for, the output is forwared appropriately.
+   * are accounted for, the output is forwarded appropriately.
    */
   private void genObject(ArrayList<boolean[]> inputNulls, int aliasNum,
       IntermediateObject intObj, boolean firstRow) throws HiveException {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=919252&r1=919251&r2=919252&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Fri Mar  5 00:53:43 2010
@@ -68,6 +68,32 @@
   private long nextCntr = 1;
   private String lastInputFile = null;
   private MapredLocalWork localWork = null;
+  
+  private ExecMapperContext execContext = new ExecMapperContext();
+  
+  public static class ExecMapperContext {
+    boolean inputFileChanged = false;
+    String currentInputFile;
+    JobConf jc;
+    public boolean isInputFileChanged() {
+      return inputFileChanged;
+    }
+    public void setInputFileChanged(boolean inputFileChanged) {
+      this.inputFileChanged = inputFileChanged;
+    }
+    public String getCurrentInputFile() {
+      return currentInputFile;
+    }
+    public void setCurrentInputFile(String currentInputFile) {
+      this.currentInputFile = currentInputFile;
+    }
+    public JobConf getJc() {
+      return jc;
+    }
+    public void setJc(JobConf jc) {
+      this.jc = jc;
+    }
+  }
 
   @Override
   public void configure(JobConf job) {
@@ -86,6 +112,7 @@
     }
     try {
       jc = job;
+      execContext.jc = jc;
       // create map and fetch operators
       MapredWork mrwork = Utilities.getMapRedWork(job);
       mo = new MapOperator();
@@ -93,8 +120,10 @@
       // initialize map operator
       mo.setChildren(job);
       l4j.info(mo.dump(0));
+      mo.setExecContext(execContext);
+      mo.initializeLocalWork(jc);
       mo.initialize(jc, null);
-
+      
       // initialize map local work
       localWork = mrwork.getMapLocalWork();
       if (localWork == null) {
@@ -112,6 +141,7 @@
       for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
         Operator<? extends Serializable> forwardOp = localWork.getAliasToWork()
             .get(entry.getKey());
+        forwardOp.setExecContext(execContext);
         // All the operators need to be initialized before process
         forwardOp.initialize(jc, new ObjectInspector[] {entry.getValue()
             .getOutputObjectInspector()});
@@ -141,11 +171,12 @@
       mo.setReporter(rp);
     }
     
-    if (localWork != null
-        && (this.lastInputFile == null || 
-            (localWork.getInputFileChangeSensitive() && inputFileChanged()))) {
+    if(inputFileChanged()) {
+      if (this.localWork != null
+          && (localWork.getInputFileChangeSensitive() || this.lastInputFile == null)) {
+        processMapLocalWork(localWork.getInputFileChangeSensitive());
+      }
       this.lastInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME);
-      processMapLocalWork(localWork.getInputFileChangeSensitive());      
     }
     
     try {
@@ -188,10 +219,13 @@
    */
   private boolean inputFileChanged() {
     String currentInputFile = HiveConf.getVar(jc, HiveConf.ConfVars.HADOOPMAPFILENAME);
+    execContext.currentInputFile = currentInputFile;
     if (this.lastInputFile == null
         || !this.lastInputFile.equals(currentInputFile)) {
+      execContext.inputFileChanged = true;
       return true;
     }
+    execContext.inputFileChanged = false;
     return false;
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=919252&r1=919251&r2=919252&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Mar  5 00:53:43 2010
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.File;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -50,32 +49,14 @@
 /**
  * Map side Join operator implementation.
  */
-public class MapJoinOperator extends CommonJoinOperator<MapJoinDesc> implements
+public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implements
     Serializable {
   private static final long serialVersionUID = 1L;
   private static final Log LOG = LogFactory.getLog(MapJoinOperator.class
       .getName());
 
-  /**
-   * The expressions for join inputs's join keys.
-   */
-  protected transient Map<Byte, List<ExprNodeEvaluator>> joinKeys;
-  /**
-   * The ObjectInspectors for the join inputs's join keys.
-   */
-  protected transient Map<Byte, List<ObjectInspector>> joinKeysObjectInspectors;
-  /**
-   * The standard ObjectInspectors for the join inputs's join keys.
-   */
-  protected transient Map<Byte, List<ObjectInspector>> joinKeysStandardObjectInspectors;
-
-  private transient int posBigTable; // one of the tables that is not in memory
-  transient int mapJoinRowsKey; // rows for a given key
-
   protected transient Map<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>> mapJoinTables;
 
-  protected transient RowContainer<ArrayList<Object>> emptyList = null;
-
   private static final transient String[] FATAL_ERR_MSG = {
       null, // counter value 0 means no error
       "Mapside join size exceeds hive.mapjoin.maxsize. "
@@ -135,43 +116,30 @@
     return mapMetadata;
   }
 
-  transient boolean firstRow;
-
   transient int metadataKeyTag;
   transient int[] metadataValueTag;
-  transient List<File> hTables;
-  transient int numMapRowsRead;
-  transient int heartbeatInterval;
   transient int maxMapJoinSize;
+  
+  public MapJoinOperator() {
+  }
+
+  public MapJoinOperator(AbstractMapJoinOperator<? extends MapJoinDesc> mjop) {
+    super(mjop);
+  }
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);
-    numMapRowsRead = 0;
-
-    firstRow = true;
-    heartbeatInterval = HiveConf.getIntVar(hconf,
-        HiveConf.ConfVars.HIVESENDHEARTBEAT);
+    
     maxMapJoinSize = HiveConf.getIntVar(hconf,
         HiveConf.ConfVars.HIVEMAXMAPJOINSIZE);
 
-    joinKeys = new HashMap<Byte, List<ExprNodeEvaluator>>();
-
-    populateJoinKeyValue(joinKeys, conf.getKeys());
-    joinKeysObjectInspectors = getObjectInspectorsFromEvaluators(joinKeys,
-        inputObjInspectors);
-    joinKeysStandardObjectInspectors = getStandardObjectInspectors(joinKeysObjectInspectors);
-
-    // all other tables are small, and are cached in the hash table
-    posBigTable = conf.getPosBigTable();
-
     metadataValueTag = new int[numAliases];
     for (int pos = 0; pos < numAliases; pos++) {
       metadataValueTag[pos] = -1;
     }
 
     mapJoinTables = new HashMap<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>>();
-    hTables = new ArrayList<File>();
 
     // initialize the hash tables for other tables
     for (int pos = 0; pos < numAliases; pos++) {
@@ -186,33 +154,6 @@
 
       mapJoinTables.put(Byte.valueOf((byte) pos), hashTable);
     }
-
-    emptyList = new RowContainer<ArrayList<Object>>(1, hconf);
-    RowContainer bigPosRC = getRowContainer(hconf, (byte) posBigTable,
-        order[posBigTable], joinCacheSize);
-    storage.put((byte) posBigTable, bigPosRC);
-
-    mapJoinRowsKey = HiveConf.getIntVar(hconf,
-        HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
-
-    List<? extends StructField> structFields = ((StructObjectInspector) outputObjInspector)
-        .getAllStructFieldRefs();
-    if (conf.getOutputColumnNames().size() < structFields.size()) {
-      List<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>();
-      for (Byte alias : order) {
-        int sz = conf.getExprs().get(alias).size();
-        List<Integer> retained = conf.getRetainList().get(alias);
-        for (int i = 0; i < sz; i++) {
-          int pos = retained.get(i);
-          structFieldObjectInspectors.add(structFields.get(pos)
-              .getFieldObjectInspector());
-        }
-      }
-      outputObjInspector = ObjectInspectorFactory
-          .getStandardStructObjectInspector(conf.getOutputColumnNames(),
-          structFieldObjectInspectors);
-    }
-    initializeChildren(hconf);
   }
 
   @Override
@@ -258,11 +199,7 @@
           firstRow = false;
         }
 
-        // Send some status periodically
-        numMapRowsRead++;
-        if (((numMapRowsRead % heartbeatInterval) == 0) && (reporter != null)) {
-          reporter.progress();
-        }
+        reportProgress();
 
         if ((numMapRowsRead > maxMapJoinSize) && (reporter != null)
             && (counterNameToEnum != null)) {
@@ -380,7 +317,7 @@
     }
     super.closeOp(abort);
   }
-
+  
   /**
    * Implements the getName function for the Node Interface.
    * 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=919252&r1=919251&r2=919252&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Mar  5 00:53:43 2010
@@ -29,6 +29,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.ExecMapper.ExecMapperContext;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.Explain;
@@ -67,6 +68,8 @@
    * run-time while extracting the operator specific counts.
    */
   protected HashMap<String, ProgressCounter> counterNameToEnum;
+  
+  private transient ExecMapperContext execContext;
 
   private static int seqId;
 
@@ -284,7 +287,7 @@
     }
     return true;
   }
-
+  
   /**
    * Initializes operators only if all parents have been initialized. Calls
    * operator specific initializer which then initializes child ops.
@@ -340,11 +343,23 @@
     }
     // derived classes can set this to different object if needed
     outputObjInspector = inputObjInspectors[0];
-
+    
+    //pass the exec context to child operators
+    passExecContext(this.execContext);
+    
     initializeOp(hconf);
     LOG.info("Initialization Done " + id + " " + getName());
   }
-
+  
+  public void initializeLocalWork(Configuration hconf) throws HiveException {
+    if (childOperators != null) {
+      for (int i =0; i<childOperators.size();i++) {
+        Operator<? extends Serializable> childOp = this.childOperators.get(i);
+        childOp.initializeLocalWork(hconf);
+      }
+    }
+  }
+  
   /**
    * Operator specific initialization.
    */
@@ -371,6 +386,18 @@
       }
     }
   }
+  
+  /**
+   * Pass the execContext reference to every child operator 
+   */
+  public void passExecContext(ExecMapperContext execContext) {
+    this.setExecContext(execContext);
+    if(childOperators != null) {
+      for (int i = 0; i < childOperators.size(); i++) {
+        childOperators.get(i).passExecContext(execContext);
+      }
+    }
+  }
 
   /**
    * Collects all the parent's output object inspectors and calls actual
@@ -398,7 +425,7 @@
     // call the actual operator initialization function
     initialize(hconf, null);
   }
-
+  
   /**
    * Process the row.
    * 
@@ -470,7 +497,7 @@
     LOG.debug("End group Done");
   }
 
-  private boolean allInitializedParentsAreClosed() {
+  protected boolean allInitializedParentsAreClosed() {
     if (parentOperators != null) {
       for (Operator<? extends Serializable> parent : parentOperators) {
         if (!(parent.state == State.CLOSE || parent.state == State.UNINIT)) {
@@ -1117,4 +1144,18 @@
   public Object getGroupKeyObject() {
     return groupKeyObject;
   }
+
+  public ExecMapperContext getExecContext() {
+    return execContext;
+  }
+
+  public void setExecContext(ExecMapperContext execContext) {
+    this.execContext = execContext;
+    if(this.childOperators != null) {
+      for (int i = 0; i<this.childOperators.size();i++) {
+        Operator<? extends Serializable> op = this.childOperators.get(i);
+        op.setExecContext(execContext);
+      }
+    }
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=919252&r1=919251&r2=919252&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Fri Mar  5 00:53:43 2010
@@ -33,6 +33,7 @@
 import org.apache.hadoop.hive.ql.plan.LimitDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 import org.apache.hadoop.hive.ql.plan.ScriptDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -74,6 +75,7 @@
     opvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, GroupByOperator.class));
     opvec.add(new OpTuple<JoinDesc>(JoinDesc.class, JoinOperator.class));
     opvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, MapJoinOperator.class));
+    opvec.add(new OpTuple<SMBJoinDesc>(SMBJoinDesc.class, SMBMapJoinOperator.class));
     opvec.add(new OpTuple<LimitDesc>(LimitDesc.class, LimitOperator.class));
     opvec.add(new OpTuple<TableScanDesc>(TableScanDesc.class, TableScanOperator.class));
     opvec.add(new OpTuple<UnionDesc>(UnionDesc.class, UnionOperator.class));

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=919252&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Fri Mar  5 00:53:43 2010
@@ -0,0 +1,519 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork.BucketMapJoinContext;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Sorted Merge Map Join Operator.
+ */
+public class SMBMapJoinOperator extends AbstractMapJoinOperator<SMBJoinDesc> implements
+    Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Log LOG = LogFactory.getLog(SMBMapJoinOperator.class
+      .getName());
+
+  private MapredLocalWork localWork = null;
+  private Map<String, FetchOperator> fetchOperators;
+  transient Map<Byte, ArrayList<Object>> keyWritables;
+  transient Map<Byte, ArrayList<Object>> nextKeyWritables;
+  HashMap<Byte, RowContainer<ArrayList<Object>>> nextGroupStorage;
+  HashMap<Byte, RowContainer<ArrayList<Object>>> candidateStorage;
+  
+  transient HashMap<Byte, String> tagToAlias;
+  private transient HashMap<Byte, Boolean> fetchOpDone = new HashMap<Byte, Boolean>();
+  private transient HashMap<Byte, Boolean> foundNextKeyGroup = new HashMap<Byte, Boolean>();
+  transient boolean firstFetchHappened = false;
+  transient boolean localWorkInited = false;
+
+  public SMBMapJoinOperator() {
+  }
+  
+  public SMBMapJoinOperator(AbstractMapJoinOperator<? extends MapJoinDesc> mapJoinOp) {
+    super(mapJoinOp);
+  }
+  
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
+    
+    firstRow = true;
+    
+    closeCalled = false;
+
+    nextGroupStorage = new HashMap<Byte, RowContainer<ArrayList<Object>>>();
+    candidateStorage = new HashMap<Byte, RowContainer<ArrayList<Object>>>();
+    int bucketSize = HiveConf.getIntVar(hconf,
+        HiveConf.ConfVars.HIVEMAPJOINBUCKETCACHESIZE);
+    byte storePos = (byte) 0;
+    for (Byte alias : order) {
+      RowContainer rc = getRowContainer(hconf, storePos, alias, bucketSize);
+      nextGroupStorage.put((byte) storePos, rc);
+      RowContainer candidateRC = getRowContainer(hconf, storePos, alias,
+          bucketSize);
+      candidateStorage.put(alias, candidateRC);
+      storePos++;
+    }
+    tagToAlias = conf.getTagToAlias();
+    keyWritables = new HashMap<Byte, ArrayList<Object>>();
+    nextKeyWritables = new HashMap<Byte, ArrayList<Object>>();
+    
+    for (Byte alias : order) {
+      if(alias != (byte) posBigTable) {
+        fetchOpDone.put(alias, Boolean.FALSE);;
+      }
+      foundNextKeyGroup.put(alias, Boolean.FALSE);
+    }
+  }
+  
+  @Override
+  public void initializeLocalWork(Configuration hconf) throws HiveException {
+    initializeMapredLocalWork(this.getConf(), hconf, this.getConf().getLocalWork(), LOG);
+    super.initializeLocalWork(hconf);
+  }
+
+  public void initializeMapredLocalWork(MapJoinDesc conf, Configuration hconf,
+      MapredLocalWork localWork, Log l4j) throws HiveException {
+    if (localWork == null || localWorkInited) {
+      return;
+    }
+    localWorkInited = true;
+    this.localWork = localWork;
+    fetchOperators = new HashMap<String, FetchOperator>();
+    // create map local operators
+    for (Map.Entry<String, FetchWork> entry : localWork.getAliasToFetchWork()
+        .entrySet()) {
+      fetchOperators.put(entry.getKey(), new FetchOperator(entry.getValue(),
+          new JobConf(hconf)));
+      if (l4j != null) {
+        l4j.info("fetchoperator for " + entry.getKey() + " created");
+      }
+    }
+    
+    for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
+      Operator<? extends Serializable> forwardOp = localWork.getAliasToWork()
+          .get(entry.getKey());
+      // All the operators need to be initialized before process
+      forwardOp.setExecContext(this.getExecContext());
+      forwardOp.initialize(this.getExecContext().jc, new ObjectInspector[] {entry.getValue()
+          .getOutputObjectInspector()});
+      l4j.info("fetchoperator for " + entry.getKey() + " initialized");
+    }
+  }
+  
+  @Override
+  public void processOp(Object row, int tag) throws HiveException {
+
+    if (this.getExecContext().inputFileChanged) {
+      if(firstFetchHappened) {
+        //we need to first join and flush out data left by the previous file. 
+        joinFinalLeftData();        
+      }
+      //set up the fetch operator for the new input file.
+      for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
+        String alias = entry.getKey();
+        FetchOperator fetchOp = entry.getValue();
+        fetchOp.clearFetchContext();
+        setUpFetchOpContext(fetchOp, alias);
+      }
+      this.getExecContext().inputFileChanged = false;
+      firstFetchHappened = false;
+    }
+    
+    if (!firstFetchHappened) {
+      firstFetchHappened = true;
+      // fetch the first group for all small table aliases
+      for (Byte t : order) {
+        if(t != (byte)posBigTable) {
+          fetchNextGroup(t);            
+        }
+      }
+    }
+
+    byte alias = (byte) tag;
+    // compute keys and values as StandardObjects
+    ArrayList<Object> key = computeValues(row, joinKeys.get(alias),
+        joinKeysObjectInspectors.get(alias));
+    ArrayList<Object> value = computeValues(row, joinValues.get(alias),
+        joinValuesObjectInspectors.get(alias));
+
+    //have we reached a new key group?
+    boolean nextKeyGroup = processKey(alias, key);
+    if (nextKeyGroup) {
+      //assert this.nextGroupStorage.get(alias).size() == 0;
+      this.nextGroupStorage.get(alias).add(value);
+      foundNextKeyGroup.put((byte) tag, Boolean.TRUE);
+      if (tag != posBigTable) {
+        return;
+      }
+    }
+    
+    reportProgress();
+
+    // the big table has reached a new key group. try to let the small tables
+    // catch up with the big table.
+    if (nextKeyGroup) {
+      assert tag == (byte)posBigTable;
+      List<Byte> smallestPos = null;
+      do {
+        smallestPos = joinOneGroup();
+        //jump out the loop if we need input from the big table
+      } while (smallestPos != null && smallestPos.size() > 0
+          && !smallestPos.contains((byte)this.posBigTable));
+      
+      return;
+    }
+    
+    assert !nextKeyGroup;
+    candidateStorage.get((byte) tag).add(value);
+  }
+
+  /*
+   * this happens either when the input file of the big table is changed or in
+   * closeop. It needs to fetch all the left data from the small tables and try
+   * to join them.
+   */
+  private void joinFinalLeftData() throws HiveException {
+    RowContainer bigTblRowContainer = this.candidateStorage.get((byte)this.posBigTable);
+    
+    boolean allFetchOpDone = allFetchOpDone();
+    // if all left data in small tables are less than and equal to the left data
+    // in big table, let's them catch up
+    while (bigTblRowContainer != null && bigTblRowContainer.size() > 0
+        && !allFetchOpDone) {
+      joinOneGroup();
+      bigTblRowContainer = this.candidateStorage.get((byte)this.posBigTable);
+      allFetchOpDone = allFetchOpDone();
+    }
+    
+    if (allFetchOpDone
+        && this.candidateStorage.get((byte) this.posBigTable).size() > 0) {
+      // if all fetch operator for small tables are done and there are data left
+      // in big table 
+      for (byte t : order) {
+        if(this.foundNextKeyGroup.get(t) && this.nextKeyWritables.get(t) != null) {
+          promoteNextGroupToCandidate(t);
+        }
+      }
+      joinOneGroup();
+    } else {
+      while (!allFetchOpDone) {
+        List<Byte> ret = joinOneGroup();
+        if (ret == null || ret.size() == 0) {
+          break;
+        }
+        
+        reportProgress();
+        
+        allFetchOpDone = allFetchOpDone();
+      }
+      //one final table left
+      for (byte t : order) {
+        if(this.foundNextKeyGroup.get(t) && this.nextKeyWritables.get(t) != null) {
+          promoteNextGroupToCandidate(t);
+        }
+      }
+      joinOneGroup();
+    }
+  }
+
+  private boolean allFetchOpDone() {
+    boolean allFetchOpDone = true;
+    for (Byte tag : order) {
+      if(tag == (byte) posBigTable) {
+        continue;
+      }
+      allFetchOpDone = allFetchOpDone && fetchOpDone.get(tag);
+    }
+    return allFetchOpDone;
+  }
+
+  private List<Byte> joinOneGroup() throws HiveException {
+    int smallestPos = -1;
+    smallestPos = findMostSmallKey();
+    List<Byte> listOfNeedFetchNext = null;
+    if(smallestPos >= 0) {
+      listOfNeedFetchNext = joinObject(smallestPos);
+      if (listOfNeedFetchNext.size() > 0) {
+        // listOfNeedFetchNext contains all tables that we have joined data in their
+        // candidateStorage, and we need to clear candidate storage and promote their
+        // nextGroupStorage to candidateStorage and fetch data until we reach a
+        // new group.
+        for (Byte b : listOfNeedFetchNext) {
+          fetchNextGroup(b);
+        }
+      }
+    }
+    return listOfNeedFetchNext;
+  }
+
+  private List<Byte> joinObject(int smallestPos) throws HiveException {
+    List<Byte> needFetchList = new ArrayList<Byte>();
+    ArrayList<Object> smallKey = keyWritables.get((byte) smallestPos);
+    needFetchList.add((byte)smallestPos);
+    this.storage.put((byte) smallestPos, this.candidateStorage.get((byte) smallestPos));
+    for (Byte i : order) {
+      if ((byte) smallestPos == i) {
+        continue;
+      }
+      ArrayList<Object> key = keyWritables.get(i);
+      if (key == null) {
+        putDummyOrEmpty(i);
+      } else {
+        int cmp = compareKeys(key, smallKey);
+        if (cmp == 0) {
+          this.storage.put((byte) i, this.candidateStorage
+              .get((byte) i));
+          needFetchList.add(i);
+          continue;
+        } else {
+          putDummyOrEmpty(i);
+        }
+      }
+    }
+    checkAndGenObject();
+    for (Byte pos : needFetchList) {
+      this.candidateStorage.get(pos).clear();
+      this.keyWritables.remove(pos);
+    }
+    return needFetchList;
+  }
+  
+  private void fetchNextGroup(Byte t) throws HiveException {
+    if (foundNextKeyGroup.get(t)) {
+      // first promote the next group to be the current group if we reached a
+      // new group in the previous fetch
+      if (this.nextKeyWritables.get(t) != null) {
+        promoteNextGroupToCandidate(t);
+      } else {
+        this.keyWritables.remove(t);
+        this.candidateStorage.remove(t);
+        this.nextGroupStorage.remove(t);
+      }
+      foundNextKeyGroup.put(t, Boolean.FALSE);
+    }
+    //for the big table, we only need to promote the next group to the current group.
+    if(t == (byte)posBigTable) {
+      return;
+    }
+    
+    //for tables other than the big table, we need to fetch more data until reach a new group or done.
+    while (!foundNextKeyGroup.get(t)) {
+      if (fetchOpDone.get(t)) {
+        break;
+      }
+      fetchOneRow(t);
+    }
+    if (!foundNextKeyGroup.get(t) && fetchOpDone.get(t)) {
+      this.nextKeyWritables.remove(t);
+    }
+  }
+
+  private void promoteNextGroupToCandidate(Byte t) throws HiveException {
+    this.keyWritables.put(t, this.nextKeyWritables.get(t));
+    this.nextKeyWritables.remove(t);
+    RowContainer<ArrayList<Object>> oldRowContainer = this.candidateStorage.get(t);
+    oldRowContainer.clear();
+    this.candidateStorage.put(t, this.nextGroupStorage.get(t));
+    this.nextGroupStorage.put(t, oldRowContainer);
+  }
+  
+  private int compareKeys (ArrayList<Object> k1, ArrayList<Object> k2) {
+    int ret = 0;
+    for (int i = 0; i < k1.size() && i < k1.size(); i++) {
+      WritableComparable key_1 = (WritableComparable) k1.get(i);
+      WritableComparable key_2 = (WritableComparable) k2.get(i);
+      ret = WritableComparator.get(key_1.getClass()).compare(key_1, key_2);
+      if(ret != 0) {
+        return ret;
+      }
+    }
+    return k1.size() - k2.size();
+  }
+
+  private void putDummyOrEmpty(Byte i) {
+    // put a empty list or null
+    if (noOuterJoin) {
+      storage.put(i, emptyList);
+    } else {
+      storage.put(i, dummyObjVectors[i.intValue()]);
+    }
+  }
+
+  private int findMostSmallKey() {
+    byte index = -1;
+    ArrayList<Object> mostSmallOne = null;
+
+    for (byte i : order) {
+      ArrayList<Object> key = keyWritables.get(i);
+      if (key == null) {
+        continue;
+      }
+      if (mostSmallOne == null) {
+        mostSmallOne = key;
+        index = i;
+        continue;
+      }
+      int cmp = compareKeys(key, mostSmallOne);
+      if (cmp < 0) {
+        mostSmallOne = key;
+        index = i;
+        continue;
+      }
+    }
+    return index;
+  }
+
+  private boolean processKey(byte alias, ArrayList<Object> key)
+      throws HiveException {
+    ArrayList<Object> keyWritable = keyWritables.get(alias);
+    if (keyWritable == null) {
+      //the first group.
+      keyWritables.put(alias, key);
+      return false;
+    } else {
+      int cmp = compareKeys(key, keyWritable);;
+      if (cmp != 0) {
+        nextKeyWritables.put(alias, key);
+        return true;
+      }
+      return false;
+    }
+  }
+
+  private void setUpFetchOpContext(FetchOperator fetchOp, String alias) {
+    String currentInputFile = this.getExecContext().currentInputFile;
+    BucketMapJoinContext bucketMatcherCxt = this.localWork
+        .getBucketMapjoinContext();
+    Class<? extends BucketMatcher> bucketMatcherCls = bucketMatcherCxt
+        .getBucketMatcherClass();
+    BucketMatcher bucketMatcher = (BucketMatcher) ReflectionUtils.newInstance(
+        bucketMatcherCls, null);
+    bucketMatcher.setAliasBucketFileNameMapping(bucketMatcherCxt
+        .getAliasBucketFileNameMapping());
+    List<Path> aliasFiles = bucketMatcher.getAliasBucketFiles(currentInputFile,
+        bucketMatcherCxt.getMapJoinBigTableAlias(), alias);
+    Iterator<Path> iter = aliasFiles.iterator();
+    fetchOp.setupContext(iter, null);
+  }
+
+  private void fetchOneRow(byte tag) {
+    if (fetchOperators != null) {
+      String tble = this.tagToAlias.get(tag);
+      FetchOperator fetchOp = fetchOperators.get(tble);
+      
+      Operator<? extends Serializable> forwardOp = localWork.getAliasToWork()
+          .get(tble);
+      try {
+        InspectableObject row = fetchOp.getNextRow();
+        if (row == null) {
+          this.fetchOpDone.put(tag, Boolean.TRUE);
+          return;
+        }
+        forwardOp.process(row.o, 0);
+        // check if any operator had a fatal error or early exit during
+        // execution
+        if (forwardOp.getDone()) {
+          this.fetchOpDone.put(tag, Boolean.TRUE);
+        }
+      } catch (Throwable e) {
+        if (e instanceof OutOfMemoryError) {
+          // Don't create a new object if we are already out of memory
+          throw (OutOfMemoryError) e;
+        } else {
+          throw new RuntimeException("Map local work failed", e);
+        }
+      }
+    }
+  }
+  
+  transient boolean closeCalled = false;
+  @Override
+  public void closeOp(boolean abort) throws HiveException {
+    if(closeCalled) {
+      return;
+    }
+    closeCalled = true;
+    joinFinalLeftData();
+    this.firstFetchHappened = false;
+    //clean up 
+    for (Byte alias : order) {
+      if(alias != (byte) posBigTable) {
+        fetchOpDone.put(alias, Boolean.FALSE);;
+      }
+      foundNextKeyGroup.put(alias, Boolean.FALSE);
+    }
+    
+    localWorkInited = false;
+    
+    super.closeOp(abort);
+    if (fetchOperators != null) {
+      for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
+        Operator<? extends Serializable> forwardOp = localWork
+            .getAliasToWork().get(entry.getKey());
+        forwardOp.close(abort);
+      }
+    }
+  }
+  
+  protected boolean allInitializedParentsAreClosed() {
+    return true;
+  }
+
+  /**
+   * Implements the getName function for the Node Interface.
+   * 
+   * @return the name of the operator
+   */
+  @Override
+  public String getName() {
+    return "MAPJOIN";
+  }
+
+  @Override
+  public int getType() {
+    return OperatorType.MAPJOIN;
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java?rev=919252&r1=919251&r2=919252&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketMapJoinOptimizer.java Fri Mar  5 00:53:43 2010
@@ -192,6 +192,8 @@
       for (int index = 0; index < joinAliases.size(); index++) {
         String alias = joinAliases.get(index);
         TableScanOperator tso = (TableScanOperator) topOps.get(alias);
+        if (tso == null)
+          return null;
         Table tbl = topToTable.get(tso);
         if(tbl.isPartitioned()) {
           PrunedPartitionList prunedParts = null;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=919252&r1=919251&r2=919252&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Fri Mar  5 00:53:43 2010
@@ -26,6 +26,7 @@
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
@@ -54,6 +55,7 @@
 import org.apache.hadoop.hive.ql.plan.ExtractDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -342,7 +344,7 @@
       return dest;
     }
 
-    MapJoinOperator currMapJoinOp = ctx.getCurrMapJoinOp();
+    AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp = ctx.getCurrMapJoinOp();
 
     if (currMapJoinOp != null) {
       opTaskMap.put(null, currTask);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java?rev=919252&r1=919251&r2=919252&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java Fri Mar  5 00:53:43 2010
@@ -26,6 +26,7 @@
 import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -35,6 +36,7 @@
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 
 /**
@@ -140,7 +142,7 @@
     String taskTmpDir;
     TableDesc tt_desc;
     Operator<? extends Serializable> rootMapJoinOp;
-    MapJoinOperator oldMapJoin;
+    AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin;
 
     public GenMRMapJoinCtx() {
       taskTmpDir = null;
@@ -157,7 +159,7 @@
      */
     public GenMRMapJoinCtx(String taskTmpDir, TableDesc tt_desc,
         Operator<? extends Serializable> rootMapJoinOp,
-        MapJoinOperator oldMapJoin) {
+        AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin) {
       this.taskTmpDir = taskTmpDir;
       this.tt_desc = tt_desc;
       this.rootMapJoinOp = rootMapJoinOp;
@@ -198,7 +200,7 @@
     /**
      * @return the oldMapJoin
      */
-    public MapJoinOperator getOldMapJoin() {
+    public AbstractMapJoinOperator<? extends MapJoinDesc> getOldMapJoin() {
       return oldMapJoin;
     }
 
@@ -206,7 +208,7 @@
      * @param oldMapJoin
      *          the oldMapJoin to set
      */
-    public void setOldMapJoin(MapJoinOperator oldMapJoin) {
+    public void setOldMapJoin(AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin) {
       this.oldMapJoin = oldMapJoin;
     }
   }
@@ -214,7 +216,7 @@
   private HiveConf conf;
   private HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap;
   private HashMap<UnionOperator, GenMRUnionCtx> unionTaskMap;
-  private HashMap<MapJoinOperator, GenMRMapJoinCtx> mapJoinTaskMap;
+  private HashMap<AbstractMapJoinOperator<? extends MapJoinDesc>, GenMRMapJoinCtx> mapJoinTaskMap;
   private List<Operator<? extends Serializable>> seenOps;
   private List<FileSinkOperator> seenFileSinkOps;
 
@@ -226,7 +228,7 @@
   private Task<? extends Serializable> currTask;
   private Operator<? extends Serializable> currTopOp;
   private UnionOperator currUnionOp;
-  private MapJoinOperator currMapJoinOp;
+  private AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp;
   private String currAliasId;
   private List<Operator<? extends Serializable>> rootOps;
 
@@ -289,7 +291,7 @@
     rootOps = new ArrayList<Operator<? extends Serializable>>();
     rootOps.addAll(parseCtx.getTopOps().values());
     unionTaskMap = new HashMap<UnionOperator, GenMRUnionCtx>();
-    mapJoinTaskMap = new HashMap<MapJoinOperator, GenMRMapJoinCtx>();
+    mapJoinTaskMap = new HashMap<AbstractMapJoinOperator<? extends MapJoinDesc>, GenMRMapJoinCtx>();
   }
 
   /**
@@ -456,7 +458,7 @@
     this.currUnionOp = currUnionOp;
   }
 
-  public MapJoinOperator getCurrMapJoinOp() {
+  public AbstractMapJoinOperator<? extends MapJoinDesc> getCurrMapJoinOp() {
     return currMapJoinOp;
   }
 
@@ -464,7 +466,7 @@
    * @param currMapJoinOp
    *          current map join operator
    */
-  public void setCurrMapJoinOp(MapJoinOperator currMapJoinOp) {
+  public void setCurrMapJoinOp(AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp) {
     this.currMapJoinOp = currMapJoinOp;
   }
 
@@ -491,11 +493,11 @@
     unionTaskMap.put(op, uTask);
   }
 
-  public GenMRMapJoinCtx getMapJoinCtx(MapJoinOperator op) {
+  public GenMRMapJoinCtx getMapJoinCtx(AbstractMapJoinOperator<? extends MapJoinDesc> op) {
     return mapJoinTaskMap.get(op);
   }
 
-  public void setMapJoinCtx(MapJoinOperator op, GenMRMapJoinCtx mjCtx) {
+  public void setMapJoinCtx(AbstractMapJoinOperator<? extends MapJoinDesc> op, GenMRMapJoinCtx mjCtx) {
     mapJoinTaskMap.put(op, mjCtx);
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=919252&r1=919251&r2=919252&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Fri Mar  5 00:53:43 2010
@@ -27,6 +27,7 @@
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -45,6 +46,7 @@
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -141,7 +143,7 @@
 
     // If there is a mapjoin at position 'pos'
     if (uPrsCtx.getMapJoinSubq(pos)) {
-      MapJoinOperator mjOp = ctx.getCurrMapJoinOp();
+      AbstractMapJoinOperator<? extends MapJoinDesc> mjOp = ctx.getCurrMapJoinOp();
       assert mjOp != null;
       GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mjOp);
       assert mjCtx != null;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=919252&r1=919251&r2=919252&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Fri Mar  5 00:53:43 2010
@@ -33,11 +33,13 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
@@ -157,7 +159,7 @@
     // The mapjoin has already been encountered. Some context must be stored
     // about that
     if (readInputMapJoin) {
-      MapJoinOperator currMapJoinOp = opProcCtx.getCurrMapJoinOp();
+      AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp = (AbstractMapJoinOperator<? extends MapJoinDesc>) opProcCtx.getCurrMapJoinOp();
       assert currMapJoinOp != null;
       boolean local = ((pos == -1) || (pos == (currMapJoinOp.getConf())
           .getPosBigTable())) ? false : true;
@@ -217,7 +219,7 @@
       seenOps.add(currTopOp);
       boolean local = (pos == desc.getPosBigTable()) ? false : true;
       setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
-      setupBucketMapJoinInfo(plan, (MapJoinOperator)op);
+      setupBucketMapJoinInfo(plan, (AbstractMapJoinOperator<? extends MapJoinDesc>)op);
     }
 
     opProcCtx.setCurrTask(currTask);
@@ -226,16 +228,35 @@
   }
 
   private static void setupBucketMapJoinInfo(MapredWork plan,
-      MapJoinOperator currMapJoinOp) {
+      AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp) {
     if (currMapJoinOp != null) {
       LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketFileNameMapping = 
         currMapJoinOp.getConf().getAliasBucketFileNameMapping();
       if(aliasBucketFileNameMapping!= null) {
         MapredLocalWork localPlan = plan.getMapLocalWork();
-        if (localPlan == null) {
-          localPlan = new MapredLocalWork(
-              new LinkedHashMap<String, Operator<? extends Serializable>>(),
-              new LinkedHashMap<String, FetchWork>());
+        if(localPlan == null) {
+          if(currMapJoinOp instanceof SMBMapJoinOperator) {
+            localPlan = ((SMBMapJoinOperator)currMapJoinOp).getConf().getLocalWork();
+          }
+          if (localPlan == null) {
+            localPlan = new MapredLocalWork(
+                new LinkedHashMap<String, Operator<? extends Serializable>>(),
+                new LinkedHashMap<String, FetchWork>());
+          }
+        } else {
+          //local plan is not null, we want to merge it into SMBMapJoinOperator's local work
+          if(currMapJoinOp instanceof SMBMapJoinOperator) {
+            MapredLocalWork smbLocalWork = ((SMBMapJoinOperator)currMapJoinOp).getConf().getLocalWork();
+            if(smbLocalWork != null) {
+              localPlan.getAliasToFetchWork().putAll(smbLocalWork.getAliasToFetchWork());
+              localPlan.getAliasToWork().putAll(smbLocalWork.getAliasToWork());
+            }
+          }
+        }
+        if(currMapJoinOp instanceof SMBMapJoinOperator) {
+          plan.setMapLocalWork(null);
+          ((SMBMapJoinOperator)currMapJoinOp).getConf().setLocalWork(localPlan);
+        } else {
           plan.setMapLocalWork(localPlan);
         }
         BucketMapJoinContext bucketMJCxt = new BucketMapJoinContext();
@@ -364,11 +385,14 @@
               : true;
         }
         setTaskPlan(currAliasId, currTopOp, plan, local, opProcCtx);
+        if(op instanceof AbstractMapJoinOperator) {
+          setupBucketMapJoinInfo(plan, (AbstractMapJoinOperator<? extends MapJoinDesc>)op);          
+        }
       }
       currTopOp = null;
       opProcCtx.setCurrTopOp(currTopOp);
     } else if (opProcCtx.getCurrMapJoinOp() != null) {
-      MapJoinOperator mjOp = opProcCtx.getCurrMapJoinOp();
+      AbstractMapJoinOperator<? extends MapJoinDesc> mjOp = (AbstractMapJoinOperator<? extends MapJoinDesc>) opProcCtx.getCurrMapJoinOp();
       if (readUnionData) {
         initUnionPlan(opProcCtx, currTask, false);
       } else {
@@ -376,7 +400,7 @@
 
         // In case of map-join followed by map-join, the file needs to be
         // obtained from the old map join
-        MapJoinOperator oldMapJoin = mjCtx.getOldMapJoin();
+        AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin = (AbstractMapJoinOperator<? extends MapJoinDesc>) mjCtx.getOldMapJoin();
         String taskTmpDir = null;
         TableDesc tt_desc = null;
         Operator<? extends Serializable> rootOp = null;
@@ -819,8 +843,8 @@
     setTaskPlan(taskTmpDir, streamDesc, ts_op, cplan, local, tt_desc);
 
     // This can be cleaned up as a function table in future
-    if (op instanceof MapJoinOperator) {
-      MapJoinOperator mjOp = (MapJoinOperator) op;
+    if (op instanceof AbstractMapJoinOperator<?>) {
+      AbstractMapJoinOperator<? extends MapJoinDesc> mjOp = (AbstractMapJoinOperator<? extends MapJoinDesc>) op;
       opProcCtx.setCurrMapJoinOp(mjOp);
       GenMRMapJoinCtx mjCtx = opProcCtx.getMapJoinCtx(mjOp);
       if (mjCtx == null) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=919252&r1=919251&r2=919252&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Fri Mar  5 00:53:43 2010
@@ -26,6 +26,7 @@
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -43,6 +44,7 @@
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -52,7 +54,7 @@
  */
 public final class MapJoinFactory {
 
-  public static int getPositionParent(MapJoinOperator op, Stack<Node> stack) {
+  public static int getPositionParent(AbstractMapJoinOperator<? extends MapJoinDesc> op, Stack<Node> stack) {
     int pos = 0;
     int size = stack.size();
     assert size >= 2 && stack.get(size - 1) == op;
@@ -72,7 +74,7 @@
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      MapJoinOperator mapJoin = (MapJoinOperator) nd;
+      AbstractMapJoinOperator<MapJoinDesc> mapJoin = (AbstractMapJoinOperator<MapJoinDesc>) nd;
       GenMRProcContext ctx = (GenMRProcContext) procCtx;
 
       // find the branch on which this processor was invoked
@@ -122,7 +124,7 @@
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      MapJoinOperator mapJoin = (MapJoinOperator) nd;
+      AbstractMapJoinOperator<MapJoinDesc> mapJoin = (AbstractMapJoinOperator<MapJoinDesc>) nd;
       GenMRProcContext opProcCtx = (GenMRProcContext) procCtx;
 
       MapredWork cplan = GenMapRedUtils.getMapRedWork();
@@ -133,7 +135,7 @@
 
       // find the branch on which this processor was invoked
       int pos = getPositionParent(mapJoin, stack);
-      boolean local = (pos == (mapJoin.getConf()).getPosBigTable()) ? false
+      boolean local = (pos == ((MapJoinDesc)(mapJoin.getConf())).getPosBigTable()) ? false
           : true;
 
       GenMapRedUtils.splitTasks(mapJoin, currTask, redTask, opProcCtx, false,
@@ -180,7 +182,7 @@
         Object... nodeOutputs) throws SemanticException {
 
       SelectOperator sel = (SelectOperator) nd;
-      MapJoinOperator mapJoin = (MapJoinOperator) sel.getParentOperators().get(
+      AbstractMapJoinOperator<MapJoinDesc> mapJoin = (AbstractMapJoinOperator<MapJoinDesc>) sel.getParentOperators().get(
           0);
       assert sel.getParentOperators().size() == 1;
 
@@ -188,7 +190,7 @@
       ParseContext parseCtx = ctx.getParseCtx();
 
       // is the mapjoin followed by a reducer
-      List<MapJoinOperator> listMapJoinOps = parseCtx
+      List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinOps = parseCtx
           .getListMapJoinOpsNoReducer();
 
       if (listMapJoinOps.contains(mapJoin)) {
@@ -263,11 +265,11 @@
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      MapJoinOperator mapJoin = (MapJoinOperator) nd;
+      AbstractMapJoinOperator<? extends MapJoinDesc> mapJoin = (AbstractMapJoinOperator<? extends MapJoinDesc>) nd;
       GenMRProcContext ctx = (GenMRProcContext) procCtx;
 
       ctx.getParseCtx();
-      MapJoinOperator oldMapJoin = ctx.getCurrMapJoinOp();
+      AbstractMapJoinOperator<? extends MapJoinDesc> oldMapJoin = ctx.getCurrMapJoinOp();
       assert oldMapJoin != null;
       GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(mapJoin);
       if (mjCtx != null) {
@@ -335,7 +337,7 @@
       UnionOperator currUnion = ctx.getCurrUnionOp();
       assert currUnion != null;
       ctx.getUnionTask(currUnion);
-      MapJoinOperator mapJoin = (MapJoinOperator) nd;
+      AbstractMapJoinOperator<MapJoinDesc> mapJoin = (AbstractMapJoinOperator<MapJoinDesc>) nd;
 
       // find the branch on which this processor was invoked
       int pos = getPositionParent(mapJoin, stack);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=919252&r1=919251&r2=919252&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Fri Mar  5 00:53:43 2010
@@ -28,7 +28,13 @@
 import java.util.Set;
 import java.util.Stack;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -36,6 +42,7 @@
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
 import org.apache.hadoop.hive.ql.lib.GraphWalker;
@@ -44,21 +51,29 @@
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ErrorMsg;
 import org.apache.hadoop.hive.ql.parse.GenMapRedWalker;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 
 /**
  * Implementation of one of the rule-based map join optimization. User passes
@@ -68,6 +83,9 @@
  * implemented, this transformation can also be done based on costs.
  */
 public class MapJoinProcessor implements Transform {
+  
+  private static final Log LOG = LogFactory.getLog(MapJoinProcessor.class.getName());
+  
   private ParseContext pGraphContext;
 
   /**
@@ -84,7 +102,7 @@
     pGraphContext.getOpParseCtx().put(op, ctx);
     return op;
   }
-
+  
   /**
    * convert a regular join to a a map-side join.
    * 
@@ -101,18 +119,12 @@
     // outer join cannot be performed on a table which is being cached
     JoinDesc desc = op.getConf();
     org.apache.hadoop.hive.ql.plan.JoinCondDesc[] condns = desc.getConds();
-    for (org.apache.hadoop.hive.ql.plan.JoinCondDesc condn : condns) {
-      if (condn.getType() == JoinDesc.FULL_OUTER_JOIN) {
-        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
-      }
-      if ((condn.getType() == JoinDesc.LEFT_OUTER_JOIN)
-          && (condn.getLeft() != mapJoinPos)) {
-        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
-      }
-      if ((condn.getType() == JoinDesc.RIGHT_OUTER_JOIN)
-          && (condn.getRight() != mapJoinPos)) {
-        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
-      }
+    HiveConf hiveConf = pGraphContext.getConf();
+    boolean noCheckOuterJoin = HiveConf.getBoolVar(hiveConf,
+        HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)
+        && HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN);
+    if (!noCheckOuterJoin) {
+      checkMapJoin(mapJoinPos, condns);
     }
 
     RowResolver oldOutputRS = pctx.getOpParseCtx().get(op).getRR();
@@ -243,7 +255,7 @@
         keyTableDesc, valueExprMap, valueTableDescs, outputColumnNames,
         mapJoinPos, joinCondns), new RowSchema(outputRS.getColumnInfos()),
         newPar), outputRS);
-
+    
     mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs());
     mapJoinOp.setColumnExprMap(colExprMap);
 
@@ -264,6 +276,24 @@
     return mapJoinOp;
   }
 
+  public static void checkMapJoin(int mapJoinPos,
+      org.apache.hadoop.hive.ql.plan.JoinCondDesc[] condns)
+      throws SemanticException {
+    for (org.apache.hadoop.hive.ql.plan.JoinCondDesc condn : condns) {
+      if (condn.getType() == JoinDesc.FULL_OUTER_JOIN) {
+        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+      }
+      if ((condn.getType() == JoinDesc.LEFT_OUTER_JOIN)
+          && (condn.getLeft() != mapJoinPos)) {
+        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+      }
+      if ((condn.getType() == JoinDesc.RIGHT_OUTER_JOIN)
+          && (condn.getRight() != mapJoinPos)) {
+        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+      }
+    }
+  }
+
   private void genSelectPlan(ParseContext pctx, MapJoinOperator input)
       throws SemanticException {
     List<Operator<? extends Serializable>> childOps = input.getChildOperators();
@@ -396,7 +426,7 @@
     }
 
     // Go over the list and find if a reducer is not needed
-    List<MapJoinOperator> listMapJoinOpsNoRed = new ArrayList<MapJoinOperator>();
+    List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinOpsNoRed = new ArrayList<AbstractMapJoinOperator<? extends MapJoinDesc>>();
 
     // create a walker which walks the tree in a DFS manner while maintaining
     // the operator stack.
@@ -461,8 +491,8 @@
         Object... nodeOutputs) throws SemanticException {
 
       MapJoinWalkerCtx ctx = (MapJoinWalkerCtx) procCtx;
-      MapJoinOperator mapJoin = ctx.getCurrMapJoinOp();
-      List<MapJoinOperator> listRejectedMapJoins = ctx
+      AbstractMapJoinOperator<? extends MapJoinDesc> mapJoin = ctx.getCurrMapJoinOp();
+      List<AbstractMapJoinOperator<? extends MapJoinDesc>> listRejectedMapJoins = ctx
           .getListRejectedMapJoins();
 
       // the mapjoin has already been handled
@@ -471,9 +501,9 @@
         return null;
       }
 
-      List<MapJoinOperator> listMapJoinsNoRed = ctx.getListMapJoinsNoRed();
+      List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinsNoRed = ctx.getListMapJoinsNoRed();
       if (listMapJoinsNoRed == null) {
-        listMapJoinsNoRed = new ArrayList<MapJoinOperator>();
+        listMapJoinsNoRed = new ArrayList<AbstractMapJoinOperator<? extends MapJoinDesc>>();
       }
       listMapJoinsNoRed.add(mapJoin);
       ctx.setListMapJoins(listMapJoinsNoRed);
@@ -494,11 +524,11 @@
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
       MapJoinWalkerCtx ctx = (MapJoinWalkerCtx) procCtx;
-      MapJoinOperator mapJoin = ctx.getCurrMapJoinOp();
-      List<MapJoinOperator> listRejectedMapJoins = ctx
+      AbstractMapJoinOperator<? extends MapJoinDesc> mapJoin = ctx.getCurrMapJoinOp();
+      List<AbstractMapJoinOperator<? extends MapJoinDesc>> listRejectedMapJoins = ctx
           .getListRejectedMapJoins();
       if (listRejectedMapJoins == null) {
-        listRejectedMapJoins = new ArrayList<MapJoinOperator>();
+        listRejectedMapJoins = new ArrayList<AbstractMapJoinOperator<? extends MapJoinDesc>>();
       }
       listRejectedMapJoins.add(mapJoin);
       ctx.setListRejectedMapJoins(listRejectedMapJoins);
@@ -543,23 +573,23 @@
    *
    */
   public static class MapJoinWalkerCtx implements NodeProcessorCtx {
-    private List<MapJoinOperator> listMapJoinsNoRed;
-    private List<MapJoinOperator> listRejectedMapJoins;
-    private MapJoinOperator currMapJoinOp;
+    private List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinsNoRed;
+    private List<AbstractMapJoinOperator<? extends MapJoinDesc>> listRejectedMapJoins;
+    private AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp;
 
     /**
      * @param listMapJoinsNoRed
      */
-    public MapJoinWalkerCtx(List<MapJoinOperator> listMapJoinsNoRed) {
+    public MapJoinWalkerCtx(List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinsNoRed) {
       this.listMapJoinsNoRed = listMapJoinsNoRed;
       currMapJoinOp = null;
-      listRejectedMapJoins = new ArrayList<MapJoinOperator>();
+      listRejectedMapJoins = new ArrayList<AbstractMapJoinOperator<? extends MapJoinDesc>>();
     }
 
     /**
      * @return the listMapJoins
      */
-    public List<MapJoinOperator> getListMapJoinsNoRed() {
+    public List<AbstractMapJoinOperator<? extends MapJoinDesc>> getListMapJoinsNoRed() {
       return listMapJoinsNoRed;
     }
 
@@ -567,14 +597,14 @@
      * @param listMapJoinsNoRed
      *          the listMapJoins to set
      */
-    public void setListMapJoins(List<MapJoinOperator> listMapJoinsNoRed) {
+    public void setListMapJoins(List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinsNoRed) {
       this.listMapJoinsNoRed = listMapJoinsNoRed;
     }
 
     /**
      * @return the currMapJoinOp
      */
-    public MapJoinOperator getCurrMapJoinOp() {
+    public AbstractMapJoinOperator<? extends MapJoinDesc> getCurrMapJoinOp() {
       return currMapJoinOp;
     }
 
@@ -582,14 +612,14 @@
      * @param currMapJoinOp
      *          the currMapJoinOp to set
      */
-    public void setCurrMapJoinOp(MapJoinOperator currMapJoinOp) {
+    public void setCurrMapJoinOp(AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp) {
       this.currMapJoinOp = currMapJoinOp;
     }
 
     /**
      * @return the listRejectedMapJoins
      */
-    public List<MapJoinOperator> getListRejectedMapJoins() {
+    public List<AbstractMapJoinOperator<? extends MapJoinDesc>> getListRejectedMapJoins() {
       return listRejectedMapJoins;
     }
 
@@ -598,7 +628,7 @@
      *          the listRejectedMapJoins to set
      */
     public void setListRejectedMapJoins(
-        List<MapJoinOperator> listRejectedMapJoins) {
+        List<AbstractMapJoinOperator<? extends MapJoinDesc>> listRejectedMapJoins) {
       this.listRejectedMapJoins = listRejectedMapJoins;
     }
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=919252&r1=919251&r2=919252&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Fri Mar  5 00:53:43 2010
@@ -58,6 +58,9 @@
     transformations.add(new MapJoinProcessor());
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) {
       transformations.add(new BucketMapJoinOptimizer());
+      if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
+        transformations.add(new SortedMergeBucketMapJoinOptimizer());
+      }
     }
     transformations.add(new UnionProcessor());
     transformations.add(new JoinReorder());



Mime
View raw message