hadoop-hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zs...@apache.org
Subject svn commit: r781633 [2/13] - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ data/scripts/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ...
Date Thu, 04 Jun 2009 01:21:35 GMT
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Thu Jun  4 01:21:30 2009
@@ -20,30 +20,14 @@
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
-import java.util.Vector;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.joinCond;
 import org.apache.hadoop.hive.ql.plan.joinDesc;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
@@ -54,114 +38,17 @@
 /**
  * Join operator implementation.
  */
-public class JoinOperator extends Operator<joinDesc> implements Serializable {
-
-  static final private Log LOG = LogFactory.getLog(JoinOperator.class.getName());
-  
-  // a list of value expressions for each alias are maintained
-  public static class JoinExprMap {
-    ExprNodeEvaluator[] valueFields;
-
-    public JoinExprMap(ExprNodeEvaluator[] valueFields) {
-      this.valueFields = valueFields;
-    }
-
-    public ExprNodeEvaluator[] getValueFields() {
-      return valueFields;
-    }
-
-  }
-
-  public static class IntermediateObject {
-    ArrayList<Object>[] objs;
-    int curSize;
-
-    public IntermediateObject(ArrayList<Object>[] objs, int curSize) {
-      this.objs = objs;
-      this.curSize = curSize;
-    }
-
-    public ArrayList<Object>[] getObjs() {
-      return objs;
-    }
-
-    public int getCurSize() {
-      return curSize;
-    }
-
-    public void pushObj(ArrayList<Object> obj) {
-      objs[curSize++] = obj;
-    }
-
-    public void popObj() {
-      curSize--;
-    }
-  }
-
-  transient protected int numValues; // number of aliases
-  transient protected HashMap<Byte, JoinExprMap> joinExprs;
-  transient protected HashMap<Byte, ObjectInspector[]> joinExprsObjectInspectors;
-  transient static protected Byte[] order; // order in which the results should
-                                           // be outputted
-  transient protected joinCond[] condn;
-  transient protected boolean noOuterJoin;
-  transient private Object[] dummyObj; // for outer joins, contains the
-                                       // potential nulls for the concerned
-                                       // aliases
-  transient private Vector<ArrayList<Object>>[] dummyObjVectors;
-  transient private Stack<Iterator<ArrayList<Object>>> iterators;
-  transient private int totalSz; // total size of the composite object
-  transient ObjectInspector joinOutputObjectInspector;
-  
-  // keys are the column names. basically this maps the position of the column in 
-  // the output of the JoinOperator to the input columnInfo.
-  transient private Map<Integer, Set<String>> posToAliasMap;
-
-
-  HashMap<Byte, Vector<ArrayList<Object>>> storage;
-  int joinEmitInterval = -1;
-  int nextSz = 0;
-  transient Byte lastAlias = null;
+public class JoinOperator extends CommonJoinOperator<joinDesc> implements Serializable {
+  private static final long serialVersionUID = 1L;
   
-  public void initialize(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
-    super.initialize(hconf, reporter, inputObjInspector);
-    
-    totalSz = 0;
-    // Map that contains the rows for each alias
-    storage = new HashMap<Byte, Vector<ArrayList<Object>>>();
-
-    numValues = conf.getExprs().size();
-    joinExprs = new HashMap<Byte, JoinExprMap>();
-    joinExprsObjectInspectors = new HashMap<Byte, ObjectInspector[]>();
-    if (order == null) {
-      order = new Byte[numValues];
-      for (int i = 0; i < numValues; i++)
-        order[i] = (byte) i;
-    }
-    condn = conf.getConds();
-    noOuterJoin = conf.getNoOuterJoin();
-    Map<Byte, ArrayList<exprNodeDesc>> map = conf.getExprs();
-    Iterator entryIter = map.entrySet().iterator();
-    while (entryIter.hasNext()) {
-      Map.Entry e = (Map.Entry) entryIter.next();
-      Byte key = (Byte) e.getKey();
-      ArrayList<exprNodeDesc> expr = (ArrayList<exprNodeDesc>) e.getValue();
-      int sz = expr.size();
-      totalSz += sz;
-
-      ExprNodeEvaluator[] valueFields = new ExprNodeEvaluator[sz];
-
-      for (int j = 0; j < sz; j++)
-        valueFields[j] = ExprNodeEvaluatorFactory.get(expr.get(j));
-
-      joinExprs.put(key, new JoinExprMap(valueFields));
-    }
+  @Override
+  public void initializeOp(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
+    super.initializeOp(hconf, reporter, inputObjInspector);
 
-    ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>(
-        totalSz);
+    ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>(totalSz);
 
     for (Byte alias : order) {
-      int sz = map.get(alias).size();
+      int sz = conf.getExprs().get(alias).size();
       StructObjectInspector fldObjIns = (StructObjectInspector)((StructObjectInspector)inputObjInspector[alias.intValue()]).getStructFieldRef("VALUE").getFieldObjectInspector();
       for (int i = 0; i < sz; i++) {
         structFieldObjectInspectors.add(
@@ -170,440 +57,13 @@
                 ObjectInspectorCopyOption.KEEP));
       }
     }
-    
+
     joinOutputObjectInspector = ObjectInspectorFactory
         .getStandardStructObjectInspector(ObjectInspectorUtils
             .getIntegerArray(totalSz), structFieldObjectInspectors);
+    LOG.info("JOIN " + ((StructObjectInspector)joinOutputObjectInspector).getTypeName() + " totalsz = " + totalSz);
 
-    dummyObj = new Object[numValues];
-    dummyObjVectors = new Vector[numValues];
-
-    int pos = 0;
-    for (Byte alias : order) {
-      int sz = map.get(alias).size();
-      ArrayList<Object> nr = new ArrayList<Object>(sz);
-
-      for (int j = 0; j < sz; j++)
-        nr.add(null);
-
-      dummyObj[pos] = nr;
-      Vector<ArrayList<Object>> values = new Vector<ArrayList<Object>>();
-      values.add((ArrayList<Object>) dummyObj[pos]);
-      dummyObjVectors[pos] = values;
-      pos++;
-    }
-
-    iterators = new Stack<Iterator<ArrayList<Object>>>();
-    
-    joinEmitInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEJOINEMITINTERVAL);
-    
-    forwardCache = new Object[totalSz];
-  }
-
-  public void startGroup() throws HiveException {
-    LOG.trace("Join: Starting new group");
-    storage.clear();
-    for (Byte alias : order)
-      storage.put(alias, new Vector<ArrayList<Object>>());
-  }
-
-  private int getNextSize(int sz) {
-    // A very simple counter to keep track of join entries for a key
-    if (sz >= 100000)
-      return sz + 100000;
-    
-    return 2 * sz;
-  }
-
-  public void process(Object row, ObjectInspector rowInspector, int tag)
-      throws HiveException {
-    try {
-      // get alias
-      Byte alias = (byte)tag;
-
-      if ((lastAlias == null) || (!lastAlias.equals(alias)))
-        nextSz = joinEmitInterval;
-
-      // get the expressions for that alias
-      JoinExprMap exmap = joinExprs.get(alias);
-      ExprNodeEvaluator[] valueFields = exmap.getValueFields();
-
-      // Get the valueFields Object Inspectors
-      ObjectInspector[] valueFieldOI = joinExprsObjectInspectors.get(alias);
-      if (valueFieldOI == null) {
-        // Initialize the ExprEvaluator if necessary
-        valueFieldOI = new ObjectInspector[valueFields.length];
-        for (int i=0; i<valueFields.length; i++) {
-          valueFieldOI[i] = valueFields[i].initialize(rowInspector);
-        }
-        joinExprsObjectInspectors.put(alias, valueFieldOI);
-      }
-      
-      // Compute the values
-      ArrayList<Object> nr = new ArrayList<Object>(valueFields.length);
-      for (int i=0; i<valueFields.length; i++) {
-        nr.add(ObjectInspectorUtils.copyToStandardObject(
-            valueFields[i].evaluate(row),
-            valueFieldOI[i]));
-      }
-      
-      // number of rows for the key in the given table
-      int sz = storage.get(alias).size();
-
-      // Are we consuming too much memory
-      if (alias == numValues - 1) {
-        if (sz == joinEmitInterval) {
-          // The input is sorted by alias, so if we are already in the last join operand,
-          // we can emit some results now.
-          // Note this has to be done before adding the current row to the storage,
-          // to preserve the correctness for outer joins.
-          checkAndGenObject();
-          storage.get(alias).clear();
-        }
-      } else {
-        if (sz == nextSz) {
-          // Output a warning if we reached at least 1000 rows for a join operand
-          // We won't output a warning for the last join operand since the size
-          // will never goes to joinEmitInterval.
-          StructObjectInspector soi = (StructObjectInspector)rowInspector;
-          StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY.toString());
-          Object keyObject = soi.getStructFieldData(row, sf);
-          LOG.warn("table " + alias + " has " + sz + " rows for join key " + keyObject);
-          nextSz = getNextSize(nextSz);
-        }
-      }
-
-      // Add the value to the vector
-      storage.get(alias).add(nr);
-
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new HiveException(e);
-    }
-  }
-
-  transient Object[] forwardCache;
-  
-  private void createForwardJoinObject(IntermediateObject intObj,
-      boolean[] nullsArr) throws HiveException {
-    int p = 0;
-    for (int i = 0; i < numValues; i++) {
-      Byte alias = order[i];
-      int sz = joinExprs.get(alias).getValueFields().length;
-      if (nullsArr[i]) {
-        for (int j = 0; j < sz; j++) {
-          forwardCache[p++] = null;
-        }
-      } else {
-        ArrayList<Object> obj = intObj.getObjs()[i];
-        for (int j = 0; j < sz; j++) {
-          forwardCache[p++] = obj.get(j);
-        }
-      }
-    }
-    forward(forwardCache, joinOutputObjectInspector);
-  }
-
-  private void copyOldArray(boolean[] src, boolean[] dest) {
-    for (int i = 0; i < src.length; i++)
-      dest[i] = src[i];
-  }
-
-  private Vector<boolean[]> joinObjectsInnerJoin(Vector<boolean[]> resNulls,
-      Vector<boolean[]> inputNulls, ArrayList<Object> newObj,
-      IntermediateObject intObj, int left, boolean newObjNull) {
-    if (newObjNull)
-      return resNulls;
-    Iterator<boolean[]> nullsIter = inputNulls.iterator();
-    while (nullsIter.hasNext()) {
-      boolean[] oldNulls = nullsIter.next();
-      boolean oldObjNull = oldNulls[left];
-      if (!oldObjNull) {
-        boolean[] newNulls = new boolean[intObj.getCurSize()];
-        copyOldArray(oldNulls, newNulls);
-        newNulls[oldNulls.length] = false;
-        resNulls.add(newNulls);
-      }
-    }
-    return resNulls;
-  }
-
-  private Vector<boolean[]> joinObjectsLeftOuterJoin(
-      Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls,
-      ArrayList<Object> newObj, IntermediateObject intObj, int left,
-      boolean newObjNull) {
-    Iterator<boolean[]> nullsIter = inputNulls.iterator();
-    while (nullsIter.hasNext()) {
-      boolean[] oldNulls = nullsIter.next();
-      boolean oldObjNull = oldNulls[left];
-      boolean[] newNulls = new boolean[intObj.getCurSize()];
-      copyOldArray(oldNulls, newNulls);
-      if (oldObjNull)
-        newNulls[oldNulls.length] = true;
-      else
-        newNulls[oldNulls.length] = newObjNull;
-      resNulls.add(newNulls);
-    }
-    return resNulls;
-  }
-
-  private Vector<boolean[]> joinObjectsRightOuterJoin(
-      Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls,
-      ArrayList<Object> newObj, IntermediateObject intObj, int left,
-      boolean newObjNull, boolean firstRow) {
-    if (newObjNull)
-      return resNulls;
-
-    if (inputNulls.isEmpty() && firstRow) {
-      boolean[] newNulls = new boolean[intObj.getCurSize()];
-      for (int i = 0; i < intObj.getCurSize() - 1; i++)
-        newNulls[i] = true;
-      newNulls[intObj.getCurSize()-1] = newObjNull;
-      resNulls.add(newNulls);
-      return resNulls;
-    }
-
-    boolean allOldObjsNull = firstRow;
-
-    Iterator<boolean[]> nullsIter = inputNulls.iterator();
-    while (nullsIter.hasNext()) {
-      boolean[] oldNulls = nullsIter.next();
-      if (!oldNulls[left]) {
-        allOldObjsNull = false;
-        break;
-      }
-    }
-
-    nullsIter = inputNulls.iterator();
-    while (nullsIter.hasNext()) {
-      boolean[] oldNulls = nullsIter.next();
-      boolean oldObjNull = oldNulls[left];
-
-      if (!oldObjNull) {
-        boolean[] newNulls = new boolean[intObj.getCurSize()];
-        copyOldArray(oldNulls, newNulls);
-        newNulls[oldNulls.length] = newObjNull;
-        resNulls.add(newNulls);
-      } else if (allOldObjsNull) {
-        boolean[] newNulls = new boolean[intObj.getCurSize()];
-        for (int i = 0; i < intObj.getCurSize() - 1; i++)
-          newNulls[i] = true;
-        newNulls[oldNulls.length] = newObjNull;
-        resNulls.add(newNulls);
-        return resNulls;
-      }
-    }
-    return resNulls;
-  }
-
-  private Vector<boolean[]> joinObjectsFullOuterJoin(
-      Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls,
-      ArrayList<Object> newObj, IntermediateObject intObj, int left,
-      boolean newObjNull, boolean firstRow) {
-    if (newObjNull) {
-      Iterator<boolean[]> nullsIter = inputNulls.iterator();
-      while (nullsIter.hasNext()) {
-        boolean[] oldNulls = nullsIter.next();
-        boolean[] newNulls = new boolean[intObj.getCurSize()];
-        copyOldArray(oldNulls, newNulls);
-        newNulls[oldNulls.length] = newObjNull;
-        resNulls.add(newNulls);
-      }
-      return resNulls;
-    }
-
-    if (inputNulls.isEmpty() && firstRow) {
-      boolean[] newNulls = new boolean[intObj.getCurSize()];
-      for (int i = 0; i < intObj.getCurSize() - 1; i++)
-        newNulls[i] = true;
-      newNulls[intObj.getCurSize()-1] = newObjNull;
-      resNulls.add(newNulls);
-      return resNulls;
-    }
-
-    boolean allOldObjsNull = firstRow;
-
-    Iterator<boolean[]> nullsIter = inputNulls.iterator();
-    while (nullsIter.hasNext()) {
-      boolean[] oldNulls = nullsIter.next();
-      if (!oldNulls[left]) {
-        allOldObjsNull = false;
-        break;
-      }
-    }
-    boolean rhsPreserved = false;
-
-    nullsIter = inputNulls.iterator();
-    while (nullsIter.hasNext()) {
-      boolean[] oldNulls = nullsIter.next();
-      boolean oldObjNull = oldNulls[left];
-
-      if (!oldObjNull) {
-        boolean[] newNulls = new boolean[intObj.getCurSize()];
-        copyOldArray(oldNulls, newNulls);
-        newNulls[oldNulls.length] = newObjNull;
-        resNulls.add(newNulls);
-      } else if (oldObjNull) {
-        boolean[] newNulls = new boolean[intObj.getCurSize()];
-        copyOldArray(oldNulls, newNulls);
-        newNulls[oldNulls.length] = true;
-        resNulls.add(newNulls);
-
-        if (allOldObjsNull && !rhsPreserved) {
-          newNulls = new boolean[intObj.getCurSize()];
-          for (int i = 0; i < oldNulls.length; i++)
-            newNulls[i] = true;
-          newNulls[oldNulls.length] = false;
-          resNulls.add(newNulls);
-          rhsPreserved = true;
-        }
-      }
-    }
-    return resNulls;
+    initializeChildren(hconf, reporter, new ObjectInspector[]{joinOutputObjectInspector});
   }
-
-  /*
-   * The new input is added to the list of existing inputs. Each entry in the
-   * array of inputNulls denotes the entries in the intermediate object to be
-   * used. The intermediate object is augmented with the new object, and list of
-   * nulls is changed appropriately. The list will contain all non-nulls for a
-   * inner join. The outer joins are processed appropriately.
-   */
-  private Vector<boolean[]> joinObjects(Vector<boolean[]> inputNulls,
-                                        ArrayList<Object> newObj, IntermediateObject intObj, 
-                                        int joinPos, boolean firstRow) {
-    Vector<boolean[]> resNulls = new Vector<boolean[]>();
-    boolean newObjNull = newObj == dummyObj[joinPos] ? true : false;
-    if (joinPos == 0) {
-      if (newObjNull)
-        return null;
-      boolean[] nulls = new boolean[1];
-      nulls[0] = newObjNull;
-      resNulls.add(nulls);
-      return resNulls;
-    }
-
-    int left = condn[joinPos - 1].getLeft();
-    int type = condn[joinPos - 1].getType();
-
-    // process all nulls for RIGHT and FULL OUTER JOINS
-    if (((type == joinDesc.RIGHT_OUTER_JOIN) || (type == joinDesc.FULL_OUTER_JOIN))
-        && !newObjNull && (inputNulls == null) && firstRow) {
-      boolean[] newNulls = new boolean[intObj.getCurSize()];
-      for (int i = 0; i < newNulls.length - 1; i++)
-        newNulls[i] = true;
-      newNulls[newNulls.length - 1] = false;
-      resNulls.add(newNulls);
-      return resNulls;
-    }
-
-    if (inputNulls == null)
-      return null;
-
-    if (type == joinDesc.INNER_JOIN)
-      return joinObjectsInnerJoin(resNulls, inputNulls, newObj, intObj, left,
-          newObjNull);
-    else if (type == joinDesc.LEFT_OUTER_JOIN)
-      return joinObjectsLeftOuterJoin(resNulls, inputNulls, newObj, intObj,
-          left, newObjNull);
-    else if (type == joinDesc.RIGHT_OUTER_JOIN)
-      return joinObjectsRightOuterJoin(resNulls, inputNulls, newObj, intObj,
-                                       left, newObjNull, firstRow);
-    assert (type == joinDesc.FULL_OUTER_JOIN);
-    return joinObjectsFullOuterJoin(resNulls, inputNulls, newObj, intObj, left,
-                                    newObjNull, firstRow);
-  }
-
-  /*
-   * genObject is a recursive function. For the inputs, a array of bitvectors is
-   * maintained (inputNulls) where each entry denotes whether the element is to
-   * be used or not (whether it is null or not). The size of the bitvector is
-   * same as the number of inputs under consideration currently. When all inputs
-   * are accounted for, the output is forwared appropriately.
-   */
-  private void genObject(Vector<boolean[]> inputNulls, int aliasNum,
-                         IntermediateObject intObj, boolean firstRow) throws HiveException {
-    boolean childFirstRow = firstRow;
-    if (aliasNum < numValues) {
-      Iterator<ArrayList<Object>> aliasRes = storage.get(order[aliasNum])
-          .iterator();
-      iterators.push(aliasRes);
-      while (aliasRes.hasNext()) {
-        ArrayList<Object> newObj = aliasRes.next();
-        intObj.pushObj(newObj);
-        Vector<boolean[]> newNulls = joinObjects(inputNulls, newObj, intObj,
-                                                 aliasNum, childFirstRow);
-        genObject(newNulls, aliasNum + 1, intObj, firstRow);
-        intObj.popObj();
-        firstRow = false;
-      }
-      iterators.pop();
-    } else {
-      if (inputNulls == null)
-        return;
-      Iterator<boolean[]> nullsIter = inputNulls.iterator();
-      while (nullsIter.hasNext()) {
-        boolean[] nullsVec = nullsIter.next();
-        createForwardJoinObject(intObj, nullsVec);
-      }
-    }
-  }
-
-  /**
-   * Forward a record of join results.
-   * 
-   * @throws HiveException
-   */
-  public void endGroup() throws HiveException {
-    LOG.trace("Join Op: endGroup called: numValues=" + numValues);
-    checkAndGenObject();
-  }
-
-  private void checkAndGenObject() throws HiveException {
-    // does any result need to be emitted
-    for (int i = 0; i < numValues; i++) {
-      Byte alias = order[i];
-      if (storage.get(alias).iterator().hasNext() == false) {
-        if (noOuterJoin) {
-          LOG.trace("No data for alias=" + i);
-          return;
-        } else {
-          storage.put(alias, dummyObjVectors[i]);
-        }
-      }
-    }
-
-    LOG.trace("calling genObject");
-    genObject(null, 0, new IntermediateObject(new ArrayList[numValues], 0), true);
-    LOG.trace("called genObject");
-  }
-
-  /**
-   * All done
-   * 
-   */
-  public void close(boolean abort) throws HiveException {
-    LOG.trace("Join Op close");
-    super.close(abort);
-  }
-
-  @Override
-  public String getName() {
-    return "JOIN";
-  }
-
-  /**
-   * @return the posToAliasMap
-   */
-  public Map<Integer, Set<String>> getPosToAliasMap() {
-    return posToAliasMap;
-  }
-
-  /**
-   * @param posToAliasMap the posToAliasMap to set
-   */
-  public void setPosToAliasMap(Map<Integer, Set<String>> posToAliasMap) {
-    this.posToAliasMap = posToAliasMap;
-  }
-  
 }
+

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java Thu Jun  4 01:21:30 2009
@@ -39,10 +39,10 @@
   transient protected int limit;
   transient protected int currCount;
 
-  public void initialize(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
-    super.initialize(hconf, reporter, inputObjInspector);
+  public void initializeOp(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
     limit = conf.getLimit();
     currCount = 0;
+    initializeChildren(hconf, reporter, inputObjInspector);
   }
 
   public void process(Object row, ObjectInspector rowInspector, int tag) throws HiveException {

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObject.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObject.java?rev=781633&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObject.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinObject.java Thu Jun  4 01:21:30 2009
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Vector;
+
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+/**
+ * Map Join Object used for both key and value
+ */
+public class MapJoinObject implements Externalizable {
+
+  transient protected int     metadataTag;
+  transient protected int     objectTypeTag;
+  transient protected Object  obj;
+  
+  public MapJoinObject() {
+  }
+
+  /**
+   * @param metadataTag
+   * @param objectTypeTag
+   * @param obj
+   */
+  public MapJoinObject(int metadataTag, int objectTypeTag, Object obj) {
+    this.metadataTag = metadataTag;
+    this.objectTypeTag = objectTypeTag;
+    this.obj = obj;
+  }
+  
+  public boolean equals(Object o) {
+    if (o instanceof MapJoinObject) {
+      MapJoinObject mObj = (MapJoinObject)o;
+      if ((mObj.getMetadataTag() == metadataTag) && (mObj.getObjectTypeTag() == objectTypeTag)) {
+        if ((obj == null) && (mObj.getObj() == null))
+          return true;
+        if ((obj != null) && (mObj.getObj() != null) && (mObj.getObj().equals(obj)))
+          return true;
+      }
+    }
+
+    return false;
+  }
+  
+  public int hashCode() {
+    return (obj == null) ? 0 : obj.hashCode();
+  }
+  
+  @Override
+  public void readExternal(ObjectInput in) throws IOException,
+      ClassNotFoundException {
+    try {
+      metadataTag   = in.readInt();
+      objectTypeTag = in.readInt();
+
+      // get the tableDesc from the map stored in the mapjoin operator
+      MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(new Integer(metadataTag));
+      Writable val = null;
+    
+      assert ((objectTypeTag == 1) || (objectTypeTag == 2));
+      if (objectTypeTag == 1) {
+        val = new BytesWritable();
+        val.readFields(in);      
+        obj = (ArrayList<Object>)ctx.getDeserializer().deserialize(val);
+      }
+      else if (objectTypeTag == 2) {
+        int sz = in.readInt();
+
+        Vector<ArrayList<Object>> res = new Vector<ArrayList<Object>>();
+        for (int pos = 0; pos < sz; pos++) {
+          ArrayList<Object> memObj = new ArrayList<Object>();
+          val = new Text();
+          val.readFields(in);
+          StructObjectInspector objIns = (StructObjectInspector)ctx.getDeserObjInspector();
+          LazyStruct lazyObj = (LazyStruct)(((LazyObject)ctx.getDeserializer().deserialize(val)).getObject());
+          List<? extends StructField> listFields = objIns.getAllStructFieldRefs();
+          int k = 0;
+          for (StructField fld : listFields) {
+            memObj.add(objIns.getStructFieldData(lazyObj, fld));
+          }
+          
+          res.add(memObj);
+        }
+        obj = res;
+      }
+    } catch (Exception e) {
+      throw new IOException(e.getMessage());
+    }
+  }
+  
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException {
+    try {
+      
+      out.writeInt(metadataTag);
+      out.writeInt(objectTypeTag);
+
+      // get the tableDesc from the map stored in the mapjoin operator
+      MapJoinObjectCtx ctx = MapJoinOperator.getMapMetadata().get(new Integer(metadataTag));
+
+      // Different processing for key and value
+      if (objectTypeTag == 1) {
+        Writable val = ctx.getSerializer().serialize(obj, ctx.getSerObjInspector());
+        val.write(out);
+      }
+      else if (objectTypeTag == 2) {
+        Vector<Object> v = (Vector<Object>)obj;
+        out.writeInt(v.size());
+
+        for (int pos = 0; pos < v.size(); pos++) {
+          Writable val = ctx.getSerializer().serialize(v.get(pos), ctx.getSerObjInspector());
+          val.write(out);
+        }
+      }
+    }
+    catch (Exception e) {
+      throw new IOException(e.getMessage());
+    }
+  }
+
+  /**
+   * @return the metadataTag
+   */
+  public int getMetadataTag() {
+    return metadataTag;
+  }
+
+  /**
+   * @param metadataTag the metadataTag to set
+   */
+  public void setMetadataTag(int metadataTag) {
+    this.metadataTag = metadataTag;
+  }
+
+  /**
+   * @return the objectTypeTag
+   */
+  public int getObjectTypeTag() {
+    return objectTypeTag;
+  }
+
+  /**
+   * @param objectTypeTag the objectTypeTag to set
+   */
+  public void setObjectTypeTag(int objectTypeTag) {
+    this.objectTypeTag = objectTypeTag;
+  }
+
+  /**
+   * @return the obj
+   */
+  public Object getObj() {
+    return obj;
+  }
+
+  /**
+   * @param obj the obj to set
+   */
+  public void setObj(Object obj) {
+    this.obj = obj;
+  }
+
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=781633&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Thu Jun  4 01:21:30 2009
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.Random;
+import java.util.Vector;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.hive.ql.util.jdbm.htree.HTree;
+import org.apache.hadoop.hive.ql.util.jdbm.RecordManager;
+import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerFactory;
+import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerOptions;
+
+/**
+ * Map side Join operator implementation.
+ */
+public class MapJoinOperator extends CommonJoinOperator<mapJoinDesc> implements Serializable {
+  private static final long serialVersionUID = 1L;
+  static final private Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
+
+  transient protected Map<Byte, List<ExprNodeEvaluator>> joinKeys;
+  transient protected Map<Byte, List<ObjectInspector>> joinKeysObjectInspectors;
+
+  transient private int posBigTable;       // one of the tables that is not in memory
+  transient int mapJoinRowsKey;            // rows for a given key
+  
+  transient protected Map<Byte, HTree> mapJoinTables;
+
+  public static class MapJoinObjectCtx {
+    ObjectInspector serObjInspector;
+    Serializer      serializer;
+    Deserializer    deserializer;
+    ObjectInspector deserObjInspector;
+    
+    /**
+     * @param serObjInspector
+     * @param serializer
+     * @param deserializer
+     * @param deserObjInspector
+     */
+    public MapJoinObjectCtx(ObjectInspector serObjInspector,
+        Serializer serializer, ObjectInspector deserObjInspector, Deserializer deserializer) {
+      this.serObjInspector = serObjInspector;
+      this.serializer = serializer;
+      this.deserializer = deserializer;
+      this.deserObjInspector = deserObjInspector;
+    }
+    
+    /**
+     * @return the objInspector
+     */
+    public ObjectInspector getSerObjInspector() {
+      return serObjInspector;
+    }
+
+    /**
+     * @return the objInspector
+     */
+    public ObjectInspector getDeserObjInspector() {
+      return deserObjInspector;
+    }
+    
+    /**
+     * @return the serializer
+     */
+    public Serializer getSerializer() {
+      return serializer;
+    }
+
+    /**
+     * @return the deserializer
+     */
+    public Deserializer getDeserializer() {
+      return deserializer;
+    }
+
+  }
+
+  transient static Map<Integer, MapJoinObjectCtx> mapMetadata = new HashMap<Integer, MapJoinObjectCtx>();
+  transient static int nextVal = 0;
+  
+  static public Map<Integer, MapJoinObjectCtx> getMapMetadata() {
+    return mapMetadata;
+  }
+  
+  transient boolean firstRow;
+  
+  transient int   metadataKeyTag;
+  transient int[] metadataValueTag;
+  transient List<File> hTables;
+  
+  @Override
+  public void initializeOp(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
+    super.initializeOp(hconf, reporter, inputObjInspector);
+    firstRow = true;
+    try {
+      joinKeys  = new HashMap<Byte, List<ExprNodeEvaluator>>();
+      joinKeysObjectInspectors = new HashMap<Byte, List<ObjectInspector>>();
+      
+      populateJoinKeyValue(joinKeys, conf.getKeys());
+      
+      // all other tables are small, and are cached in the hash table
+      posBigTable = conf.getPosBigTable();
+
+      metadataValueTag = new int[numValues];
+      for (int pos = 0; pos < numValues; pos++)
+        metadataValueTag[pos] = -1;
+      
+      mapJoinTables = new HashMap<Byte, HTree>();
+      hTables = new ArrayList<File>();
+      
+      // initialize the hash tables for other tables
+      for (int pos = 0; pos < numValues; pos++) {
+        if (pos == posBigTable)
+          continue;
+        
+        Properties props = new Properties();
+        props.setProperty(RecordManagerOptions.CACHE_SIZE, 
+          String.valueOf(HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINCACHEROWS)));
+        
+        Random rand = new Random();
+        File newDir = new File("/tmp/" + rand.nextInt());
+        String newDirName = null;
+        while (true) {
+          if (newDir.mkdir()) {
+            newDirName = newDir.getAbsolutePath();
+            hTables.add(newDir);
+            break;
+          }
+          newDir = new File("/tmp" + rand.nextInt());
+        }
+        
+        RecordManager recman = RecordManagerFactory.createRecordManager(newDirName + "/" + pos, props );
+        HTree hashTable = HTree.createInstance(recman);
+        
+        mapJoinTables.put(new Byte((byte)pos), hashTable);
+      }
+
+      storage.put((byte)posBigTable, new Vector<ArrayList<Object>>());
+      
+      mapJoinRowsKey = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
+      
+      // initialize the join output object inspectors
+      ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>(totalSz);
+
+      for (Byte alias : order) {
+        int sz = conf.getExprs().get(alias).size();
+        List<? extends StructField> listFlds = ((StructObjectInspector)inputObjInspector[alias.intValue()]).getAllStructFieldRefs();
+        assert listFlds.size() == sz;
+        for (StructField fld: listFlds) {
+          structFieldObjectInspectors.add(fld.getFieldObjectInspector());
+        }
+      }
+
+      joinOutputObjectInspector = ObjectInspectorFactory
+          .getStandardStructObjectInspector(ObjectInspectorUtils
+              .getIntegerArray(totalSz), structFieldObjectInspectors);
+      
+      initializeChildren(hconf, reporter, new ObjectInspector[]{joinOutputObjectInspector});
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new HiveException(e);
+    }
+  }
+
+  @Override
+  public void process(Object row, ObjectInspector rowInspector, int tag) throws HiveException {
+    try {
+
+      // get alias
+      alias = (byte)tag;
+
+      if ((lastAlias == null) || (!lastAlias.equals(alias)))
+        nextSz = joinEmitInterval;
+      
+      // compute keys and values     
+      ArrayList<Object> key   = computeValues(row, rowInspector, joinKeys.get(alias), joinKeysObjectInspectors);
+      ArrayList<Object> value = computeValues(row, rowInspector, joinValues.get(alias), joinValuesObjectInspectors);
+
+      // Until there is one representation for the keys, convert explicitly
+      int keyPos = 0;
+      // TODO: use keyPos instead
+      for (Object keyElem : key) {
+        PrimitiveObjectInspector poi = (PrimitiveObjectInspector)joinKeysObjectInspectors.get(alias).get(keyPos);
+        if (!poi.isWritable()) {
+          // convert o to writable
+          key.set(keyPos, ObjectInspectorUtils.copyToStandardObject(key.get(keyPos), poi, ObjectInspectorCopyOption.WRITABLE));
+        }
+        keyPos++;
+      }
+
+      // does this source need to be stored in the hash map
+      if (tag != posBigTable) {
+        if (firstRow) {
+          metadataKeyTag = nextVal++;
+          
+          tableDesc keyTableDesc = conf.getKeyTblDesc();
+          Serializer keySerializer = (Serializer)keyTableDesc.getDeserializerClass().newInstance();
+          keySerializer.initialize(null, keyTableDesc.getProperties());
+
+          ExprNodeEvaluator[] keyEval = new ExprNodeEvaluator[conf.getKeys().get(new Byte((byte)tag)).size()];
+          int i=0;
+          for (exprNodeDesc e: conf.getKeys().get(new Byte((byte)tag))) {
+            keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
+          }
+
+          ObjectInspector keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, rowInspector);
+
+          Deserializer deserializer = (Deserializer)ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
+          deserializer.initialize(null, keyTableDesc.getProperties());
+          
+          mapMetadata.put(new Integer(metadataKeyTag), new MapJoinObjectCtx(keyObjectInspector, keySerializer, deserializer.getObjectInspector(), deserializer));
+          
+          firstRow = false;
+        }
+        
+        HTree hashTable = mapJoinTables.get(alias);
+        MapJoinObject keyMap = new MapJoinObject(metadataKeyTag, 1, key);
+        MapJoinObject o = (MapJoinObject)hashTable.get(keyMap);
+        Vector<ArrayList<Object>> res = null;
+        
+        if (o == null) {
+          res = new Vector<ArrayList<Object>>();
+        }
+        else {
+          res = (Vector<ArrayList<Object>>)o.getObj();
+        }
+        
+        res.add(value);
+
+        // TODO: put some warning if the size of res exceeds a given threshold
+  
+
+        if (metadataValueTag[tag] == -1) {
+          metadataValueTag[tag] = nextVal++;
+                    
+          tableDesc valueTableDesc = conf.getValueTblDescs().get(tag);
+          Serializer valueSerializer = (Serializer)valueTableDesc.getDeserializerClass().newInstance();
+          valueSerializer.initialize(null, valueTableDesc.getProperties());
+
+          ExprNodeEvaluator[] valueEval = new ExprNodeEvaluator[conf.getExprs().get(new Byte((byte)tag)).size()];
+          int i=0;
+          for (exprNodeDesc e: conf.getExprs().get(new Byte((byte)tag))) {
+            valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
+          }
+
+          ObjectInspector valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, rowInspector);
+ 
+          Deserializer deserializer = (Deserializer)ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
+          deserializer.initialize(null, valueTableDesc.getProperties());
+          
+          mapMetadata.put(new Integer((byte)metadataValueTag[tag]), new MapJoinObjectCtx(valueObjectInspector, valueSerializer, deserializer.getObjectInspector(), deserializer));
+        }
+        
+        // Construct externalizable objects for key and value
+        MapJoinObject keyObj = new MapJoinObject();
+        
+        // currently, key is always byteswritable and value is text - TODO: generalize this
+        keyObj.setMetadataTag(metadataKeyTag);
+        keyObj.setObjectTypeTag(1);
+        keyObj.setObj(key);
+        
+        MapJoinObject valueObj = new MapJoinObject();
+        
+        valueObj.setMetadataTag(metadataValueTag[tag]);
+        valueObj.setObjectTypeTag(2);
+        valueObj.setObj(res);
+
+        if (res.size() > 1)
+          hashTable.remove(keyObj);
+
+        hashTable.put(keyObj, valueObj);
+        return;
+      }
+
+
+      // Add the value to the vector
+      storage.get(alias).add(value);
+
+      for (Byte pos : order) {
+        if (pos.intValue() != tag) {
+          MapJoinObject keyMap = new MapJoinObject(metadataKeyTag, 1, key);
+          MapJoinObject o = (MapJoinObject)mapJoinTables.get(pos).get(keyMap);
+
+          if (o == null) {
+            storage.put(pos, new Vector<ArrayList<Object>>());
+          }
+          else {
+            storage.put(pos, (Vector<ArrayList<Object>>)o.getObj());
+          }
+        }
+      }
+      
+      // generate the output records
+      checkAndGenObject();
+    
+      // done with the row
+      storage.get(alias).clear();
+
+      for (Byte pos : order)
+        if (pos.intValue() != tag)
+          storage.put(pos, null);
+    
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new HiveException(e);
+    }
+  }
+  
+  /**
+   * Implements the getName function for the Node Interface.
+   * @return the name of the operator
+   */
+  public String getName() {
+    return new String("MAPJOIN");
+  }
+  
+  public void close(boolean abort) throws HiveException {
+    for (File hTbl : hTables) {
+      deleteDir(hTbl);
+    }
+    super.close(abort);
+  }
+  
+  public static void deleteDir(File dir) {
+    if (dir.isDirectory()) {
+      String[] children = dir.list();
+      for (int i = 0; i < children.length; i++) {
+        deleteDir(new File(dir, children[i]));
+      }
+    }
+
+    dir.delete();
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Thu Jun  4 01:21:30 2009
@@ -34,14 +34,12 @@
 import org.apache.hadoop.hive.ql.plan.partitionDesc;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 
-
 /**
  * Map operator. This triggers overall map side processing.
  * This is a little different from regular operators in that
@@ -55,130 +53,233 @@
   transient private LongWritable deserialize_error_count = new LongWritable ();
   transient private Deserializer deserializer;
   
-  transient private Object row;
   transient private Object[] rowWithPart;
   transient private StructObjectInspector rowObjectInspector;
+  transient private boolean isPartitioned;
+  
+  private static class MapInputPath {
+    String path;
+    String alias;
+    Operator<? extends Serializable> op;
+    
+    /**
+     * @param path
+     * @param alias
+     * @param op
+     */
+    public MapInputPath(String path, String alias,
+        Operator<? extends Serializable> op) {
+      this.path = path;
+      this.alias = alias;
+      this.op = op;
+    }
+
+    public boolean equals(Object o) {
+      if (o instanceof MapInputPath) {
+        MapInputPath mObj = (MapInputPath)o;
+        if (mObj == null)
+          return false;
+        return path.equals(mObj.path) && alias.equals(mObj.alias) && op.equals(mObj.op);
+      }
+      
+      return false;
+    }
 
-  transient private List<String> partNames;
-  transient private Object[] partValues;
-  transient private List<ObjectInspector> partObjectInspectors;
+    public int hashCode() {
+      return (op == null) ? 0 : op.hashCode();
+    }
+  }
+  
+  private static class MapOpCtx {
+    boolean               isPartitioned;
+    StructObjectInspector rowObjectInspector;
+    Object[]              rowWithPart;
+    Deserializer          deserializer;
+    
+    /**
+     * @param isPartitioned
+     * @param rowObjectInspector
+     * @param rowWithPart
+     */
+    public MapOpCtx(boolean isPartitioned,
+        StructObjectInspector rowObjectInspector, Object[] rowWithPart, Deserializer deserializer) {
+      this.isPartitioned = isPartitioned;
+      this.rowObjectInspector = rowObjectInspector;
+      this.rowWithPart = rowWithPart;
+      this.deserializer = deserializer;
+    }
+
+    /**
+     * @return the isPartitioned
+     */
+    public boolean isPartitioned() {
+      return isPartitioned;
+    }
+
+    /**
+     * @return the rowObjectInspector
+     */
+    public StructObjectInspector getRowObjectInspector() {
+      return rowObjectInspector;
+    }
+
+    /**
+     * @return the rowWithPart
+     */
+    public Object[] getRowWithPart() {
+      return rowWithPart;
+    }
+
+    /**
+     * @return the deserializer
+     */
+    public Deserializer getDeserializer() {
+      return deserializer;
+    }
+  }
   
+  private MapOpCtx initObjectInspector(Configuration hconf, String onefile) throws HiveException, ClassNotFoundException, InstantiationException, IllegalAccessException, SerDeException {
+    partitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
+    LinkedHashMap<String, String> partSpec = pd.getPartSpec();
+    tableDesc td = pd.getTableDesc();
+    Properties p = td.getProperties();
+
+    // Add alias, table name, and partitions to hadoop conf
+    HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, String.valueOf(p.getProperty("name")));
+    HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, String.valueOf(partSpec));
+    Class sdclass = td.getDeserializerClass();
+    if(sdclass == null) {
+      String className = td.getSerdeClassName();
+      if ((className == "") || (className == null)) {
+        throw new HiveException("SerDe class or the SerDe class name is not set for table: " + td.getProperties().getProperty("name"));
+      }
+      sdclass = MapOperator.class.getClassLoader().loadClass(className);
+    }
+    
+    deserializer = (Deserializer) sdclass.newInstance();
+    deserializer.initialize(hconf, p);
+    rowObjectInspector = (StructObjectInspector)deserializer.getObjectInspector();
+    
+    // Next check if this table has partitions and if so
+    // get the list of partition names as well as allocate
+    // the serdes for the partition columns
+    String pcols = p.getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
+    if (pcols != null && pcols.length() > 0) {
+      String[] partKeys = pcols.trim().split("/");
+      List<String> partNames = new ArrayList<String>(partKeys.length);
+      Object[] partValues = new Object[partKeys.length];
+      List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(partKeys.length);
+      for(int i = 0; i < partKeys.length; i++ ) {
+        String key = partKeys[i];
+        partNames.add(key);
+        partValues[i] = new Text(partSpec.get(key));
+        partObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
+      }
+      StructObjectInspector partObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(partNames, partObjectInspectors);
+      
+      rowWithPart = new Object[2];
+      rowWithPart[1] = partValues;
+      rowObjectInspector = ObjectInspectorFactory.getUnionStructObjectInspector(Arrays.asList(new StructObjectInspector[]{
+                                                                                                rowObjectInspector, 
+                                                                                                partObjectInspector}));
+      return new MapOpCtx(true, rowObjectInspector, rowWithPart, deserializer);
+    }
+    else {
+      return new MapOpCtx(false, rowObjectInspector, null, deserializer);
+    }
+  }  
 
-  public void initialize(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
-    super.initialize(hconf, reporter, inputObjInspector);
-    Path fpath = new Path((new Path (HiveConf.getVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME))).toUri().getPath());
-    ArrayList<Operator<? extends Serializable>> todo = new ArrayList<Operator<? extends Serializable>> ();
+  public void initializeOp(Configuration hconf, Reporter reporter,
+      ObjectInspector[] inputObjInspector) throws HiveException {
+    Path fpath = new Path((new Path(HiveConf.getVar(hconf,
+        HiveConf.ConfVars.HADOOPMAPFILENAME))).toUri().getPath());
+    ArrayList<Operator<? extends Serializable>> todo = new ArrayList<Operator<? extends Serializable>>();
+    Map<MapInputPath, MapOpCtx> opCtx = new HashMap<MapInputPath, MapOpCtx>();
     statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
 
-    // for each configuration path that fpath can be relativized against ..
-    for(String onefile: conf.getPathToAliases().keySet()) {
-      Path onepath = new Path(new Path(onefile).toUri().getPath());
-      if(!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
+    try {
+      // initialize the complete subtree
+      for (String onefile : conf.getPathToAliases().keySet()) {
+        MapOpCtx ctx = initObjectInspector(hconf, onefile);
 
-        // pick up work corresponding to this configuration path
         List<String> aliases = conf.getPathToAliases().get(onefile);
-        for(String onealias: aliases) {
-          LOG.info("Adding alias " + onealias + " to work list for file " + fpath.toUri().getPath());
-          Operator<? extends Serializable> op = conf.getAliasToWork().get(onealias);
-          List<Operator<? extends Serializable>> parents = new ArrayList<Operator<? extends Serializable>>();
-          parents.add(this);
-          op.setParentOperators(parents);
-          todo.add(op);
-        }
-
-        // initialize decoder once based on what table we are processing
-        if(deserializer != null) {
-          continue;
+        for (String onealias : aliases) {
+          Operator<? extends Serializable> op = conf.getAliasToWork().get(
+              onealias);
+          opCtx.put(new MapInputPath(onefile, onealias, op), ctx);
         }
+      }
 
-        partitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
-        LinkedHashMap<String, String> partSpec = pd.getPartSpec();
-        tableDesc td = pd.getTableDesc();
-        Properties p = td.getProperties();
-        // Add alias, table name, and partitions to hadoop conf
-        HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, String.valueOf(p.getProperty("name")));
-        HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, String.valueOf(partSpec));
-        try {
-          Class sdclass = td.getDeserializerClass();
-          if(sdclass == null) {
-            String className = td.getSerdeClassName();
-            if ((className == "") || (className == null)) {
-              throw new HiveException("SerDe class or the SerDe class name is not set for table: " + td.getProperties().getProperty("name"));
+      boolean done = false;
+      // for each configuration path that fpath can be relativized against ..
+      for (String onefile : conf.getPathToAliases().keySet()) {
+        Path onepath = new Path(new Path(onefile).toUri().getPath());
+        if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
+
+          // pick up work corresponding to this configuration path
+          List<String> aliases = conf.getPathToAliases().get(onefile);
+          for (String onealias : aliases) {
+            LOG.info("Adding alias " + onealias + " to work list for file "
+                + fpath.toUri().getPath());
+            Operator<? extends Serializable> op = conf.getAliasToWork().get(
+                onealias);
+            List<Operator<? extends Serializable>> parents = new ArrayList<Operator<? extends Serializable>>();
+            parents.add(this);
+            op.setParentOperators(parents);
+            todo.add(op);
+            MapInputPath inp = new MapInputPath(onefile, onealias, op);
+            LOG.info("dump " + opCtx.get(inp).getRowObjectInspector().getTypeName());
+            op.initialize(hconf, reporter, new ObjectInspector[] { opCtx.get(inp).getRowObjectInspector() });
+
+            if (!done) {
+              deserializer = opCtx.get(inp).getDeserializer();
+              isPartitioned = opCtx.get(inp).isPartitioned();
+              rowWithPart = opCtx.get(inp).getRowWithPart();
+              rowObjectInspector = opCtx.get(inp).getRowObjectInspector();
+              done = true;
             }
-            sdclass = MapOperator.class.getClassLoader().loadClass(className);
           }
-          deserializer = (Deserializer) sdclass.newInstance();
-          deserializer.initialize(hconf, p);
-          rowObjectInspector = (StructObjectInspector)deserializer.getObjectInspector();
-          
-          // Next check if this table has partitions and if so
-          // get the list of partition names as well as allocate
-          // the serdes for the partition columns
-          String pcols = p.getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
-          if (pcols != null && pcols.length() > 0) {
-            String[] partKeys = pcols.trim().split("/");
-            partNames = new ArrayList<String>(partKeys.length);
-            partValues = new Object[partKeys.length];
-            partObjectInspectors = new ArrayList<ObjectInspector>(partKeys.length);
-            for(int i = 0; i < partKeys.length; i++ ) {
-              String key = partKeys[i];
-              partNames.add(key);
-              partValues[i] = new Text(partSpec.get(key));
-              partObjectInspectors.add(
-                  PrimitiveObjectInspectorFactory.writableStringObjectInspector);
-            }
-            StructObjectInspector partObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(partNames, partObjectInspectors);
-            
-            rowWithPart = new Object[2];
-            rowWithPart[1] = partValues;
-            rowObjectInspector = ObjectInspectorFactory.getUnionStructObjectInspector(
-                Arrays.asList(new StructObjectInspector[]{
-                    rowObjectInspector, 
-                    partObjectInspector}));
-          }
-          else {
-            partNames = null;
-            partValues = null;
-          }
-
-          LOG.info("Got partitions: " + pcols);
-        } catch (SerDeException e) {
-          e.printStackTrace();
-          throw new HiveException (e);
-        } catch (InstantiationException e) {
-          throw new HiveException (e);
-        } catch (IllegalAccessException e) {
-          throw new HiveException (e);
-        } catch (ClassNotFoundException e) {
-          throw new HiveException (e);
         }
       }
-    }
 
-    if(todo.size() == 0) {
-      // didn't find match for input file path in configuration!
-      // serious problem ..
-      LOG.error("Configuration does not have any alias for path: " + fpath.toUri().getPath());
-      throw new HiveException("Configuration and input path are inconsistent");
-    }
-
-    // we found all the operators that we are supposed to process. now bootstrap
-    this.setChildOperators(todo);
-    // the child operators may need the global mr configuration. set it now so
-    // that they can get access during initiaize.
-    this.setMapredWork(conf);
-    // way hacky - need to inform child operators about output collector
-    this.setOutputCollector(out);
+      for (MapInputPath input : opCtx.keySet()) {
+        Operator<? extends Serializable> op = input.op;
+        op.initialize(hconf, reporter, new ObjectInspector[] { opCtx.get(input).getRowObjectInspector() });
+      }
 
-    for(Operator op: todo) {
-      op.initialize(hconf, reporter, inputObjInspector);
+      if (todo.size() == 0) {
+        // didn't find match for input file path in configuration!
+        // serious problem ..
+        LOG.error("Configuration does not have any alias for path: "
+            + fpath.toUri().getPath());
+        throw new HiveException("Configuration and input path are inconsistent");
+      }
+
+      // we found all the operators that we are supposed to process. now
+      // bootstrap
+      this.setChildOperators(todo);
+      // the child operators may need the global mr configuration. set it now so
+      // that they can get access during initiaize.
+      this.setMapredWork(conf);
+      // way hacky - need to inform child operators about output collector
+      this.setOutputCollector(out);
+
+    } catch (SerDeException e) {
+      e.printStackTrace();
+      throw new HiveException(e);
+    } catch (InstantiationException e) {
+      throw new HiveException(e);
+    } catch (IllegalAccessException e) {
+      throw new HiveException(e);
+    } catch (ClassNotFoundException e) {
+      throw new HiveException(e);
     }
   }
 
   public void process(Writable value) throws HiveException {
     try {
-      if (partNames == null) {
-        row = deserializer.deserialize(value);
+      if (!isPartitioned) {
+        Object row = deserializer.deserialize(value);
         forward(row, rowObjectInspector);
       } else {
         rowWithPart[0] = deserializer.deserialize(value);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Thu Jun  4 01:21:30 2009
@@ -60,7 +60,7 @@
   // initializing the whole tree in all the mappers (which might be required for mappers
   // spanning multiple files anyway, in future)
   public static enum State { UNINIT, INIT, CLOSE };
-  transient private State state = State.UNINIT;
+  transient protected State state = State.UNINIT;
 
   static {
     seqId = 0;
@@ -112,6 +112,16 @@
     return parentOperators;
   }
 
+  public void allocateParentOperatorsInitArray() {
+    if ((parentOperators == null) || (parentsObjectInspector != null))
+      return;
+    parentsObjectInspector = new ParentInit[parentOperators.size()];
+    for (int pos = 0; pos < parentOperators.size(); pos++) {
+      parentsObjectInspector[pos] = new ParentInit();
+      parentsObjectInspector[pos].done = false;
+    }
+  }
+  
   protected T conf;
   protected boolean done;
 
@@ -240,26 +250,86 @@
     return(ret);
   }
 
-  public void initialize (Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
+  public abstract void initializeOp (Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException;
+
+  public void initialize(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
     if (state == state.INIT) {
       LOG.info("Already Initialized");
       return;
     }
 
-    LOG.info("Initializing Self");
+    LOG.info("Initializing Self " + id);
     this.reporter = reporter;
     
-    if(childOperators == null) {
+    initializeOp(hconf, reporter, inputObjInspector);
+    state = State.INIT;
+
+    LOG.info("Initialization Done " + id);
+  }
+
+  /** 
+   * The default implementation assumes that the first inspector in the array is the output inspector as well. Specific operators can override this
+   * to pass their own output object inspectors. 
+   */
+  public void initializeChildren (Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
+    if (childOperators == null) {
       return;
     }
+
     LOG.info("Initializing children:");
-    for(Operator<? extends Serializable> op: childOperators) {
-      op.initialize(hconf, reporter, inputObjInspector);
+    // Copy operators from List to Array for faster access
+    if (childOperatorsArray == null && childOperators != null) {
+      childOperatorsArray = new Operator[childOperators.size()];
+      for (int i=0; i<childOperatorsArray.length; i++) {
+        childOperatorsArray[i] = childOperators.get(i); 
+        childOperatorsArray[i].allocateParentOperatorsInitArray();
+      }
+      childOperatorsTag = new int[childOperatorsArray.length];
+      for (int i=0; i<childOperatorsArray.length; i++) {
+        List<Operator<? extends Serializable>> parentOperators = 
+            childOperatorsArray[i].getParentOperators();
+        if (parentOperators == null) {
+          throw new HiveException("Hive internal error: parent is null in " 
+              + childOperatorsArray[i].getClass() + "!");
+        }
+        childOperatorsTag[i] = parentOperators.indexOf(this);
+        if (childOperatorsTag[i] == -1) {
+          throw new HiveException("Hive internal error: cannot find parent in the child operator!");
+        }
+      }
+    }
+    
+    for (int i = 0; i < childOperatorsArray.length; i++) {
+      Operator<? extends Serializable> op = childOperatorsArray[i];
+      op.initialize(hconf, reporter, inputObjInspector == null ? null : inputObjInspector[0], childOperatorsTag[i]);
     }    
+  }
 
-    state = State.INIT;
+  static private class ParentInit {
+    boolean           done;
+    ObjectInspector   parIns;
+  }
+  
+  transient protected ParentInit[] parentsObjectInspector = null; 
+
+  public void initialize(Configuration hconf, Reporter reporter, ObjectInspector inputObjInspector, int parentId) throws HiveException {
+    parentsObjectInspector[parentId].parIns = inputObjInspector;
+    parentsObjectInspector[parentId].done = true;
 
-    LOG.info("Initialization Done");
+    LOG.info("parent " + parentId + " initialized");
+    
+    // If all the parents have been initialied, go ahead
+    for (ParentInit par : parentsObjectInspector)
+      if (par.done == false)
+        return;
+
+    LOG.info("start Initializing " + id);
+    
+    ObjectInspector[] par = new ObjectInspector[parentsObjectInspector.length];
+    for (int pos = 0; pos < par.length; pos++)
+      par[pos] = parentsObjectInspector[pos].parIns;
+    initialize(hconf, reporter, par);    
+    LOG.info("done Initializing " + id);
   }
 
   /**
@@ -312,8 +382,9 @@
       for(Operator<? extends Serializable> op: childOperators) {
         op.close(abort);
       }
-
       state = State.CLOSE;
+
+      LOG.info("Close done");
     } catch (HiveException e) {
       e.printStackTrace();
       throw e;
@@ -342,7 +413,40 @@
    */
   transient protected Operator<? extends Serializable>[] childOperatorsArray;
   transient protected int[] childOperatorsTag; 
-                          
+
+   /**
+   * Replace one child with another at the same position.
+   * @param child     the old child
+   * @param newChild  the new child
+   */
+  public void  replaceChild(Operator<? extends Serializable> child, Operator<? extends Serializable> newChild) {
+    int childIndex = childOperators.indexOf(child);
+    assert childIndex != -1;
+    childOperators.set(childIndex, newChild);
+    // TODO: set parent for newChild
+  }
+
+  public void  removeChild(Operator<? extends Serializable> child) {
+    int childIndex = childOperators.indexOf(child);
+    assert childIndex != -1;
+    if (childOperators.size() == 1) 
+      childOperators = null;
+    else
+      childOperators.remove(childIndex);
+  }
+
+  /**
+   * Replace one parent with another at the same position.
+   * @param parent     the old parent
+   * @param newParent  the new parent
+   */
+  public void  replaceParent(Operator<? extends Serializable> parent, Operator<? extends Serializable> newParent) {
+    int parentIndex = parentOperators.indexOf(parent);
+    assert parentIndex != -1;
+    parentOperators.set(parentIndex, newParent);
+    // TODO: set the child in newParent correctly
+  }
+
   protected void forward(Object row, ObjectInspector rowInspector) throws HiveException {
     
     // For debugging purposes:

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Thu Jun  4 01:21:30 2009
@@ -48,6 +48,7 @@
     opvec.add(new opTuple<extractDesc> (extractDesc.class, ExtractOperator.class));
     opvec.add(new opTuple<groupByDesc> (groupByDesc.class, GroupByOperator.class));
     opvec.add(new opTuple<joinDesc> (joinDesc.class, JoinOperator.class));
+    opvec.add(new opTuple<mapJoinDesc> (mapJoinDesc.class, MapJoinOperator.class));
     opvec.add(new opTuple<limitDesc> (limitDesc.class, LimitOperator.class));
     opvec.add(new opTuple<tableScanDesc> (tableScanDesc.class, TableScanOperator.class));
     opvec.add(new opTuple<unionDesc> (unionDesc.class, UnionOperator.class));

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Thu Jun  4 01:21:30 2009
@@ -71,8 +71,9 @@
   transient int tag;
   transient byte[] tagByte = new byte[1];
   
-  public void initialize(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
-    super.initialize(hconf, reporter, inputObjInspector);
+  public void initializeOp(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
+    LOG.info("Initializing Self");
+
     try {
       keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
       int i=0;
@@ -106,6 +107,8 @@
       valueSerializer.initialize(null, valueTableDesc.getProperties());
       
       firstRow = true;
+      initializeChildren(hconf, reporter, inputObjInspector);
+      LOG.info("Initialization Done");
     } catch (Exception e) {
       e.printStackTrace();
       throw new RuntimeException(e);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Thu Jun  4 01:21:30 2009
@@ -166,8 +166,8 @@
     }
   }
 
-  public void initialize(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
-    super.initialize(hconf, reporter, inputObjInspector);
+  public void initializeOp(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
+
     statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
     statsMap.put(Counter.SERIALIZE_ERRORS, serialize_error_count);
 
@@ -180,6 +180,8 @@
       scriptInputSerializer = (Serializer)conf.getScriptInputInfo().getDeserializerClass().newInstance();
       scriptInputSerializer.initialize(hconf, conf.getScriptInputInfo().getProperties());
 
+      initializeChildren(hconf, reporter, new ObjectInspector[]{scriptOutputDeserializer.getObjectInspector()});
+
       String [] cmdArgs = splitArgs(conf.getScriptCmd());
 
       String prog = cmdArgs[0];
@@ -234,7 +236,6 @@
 
       rpTimer = new Timer(true);
       rpTimer.scheduleAtFixedRate(new ReporterTask(reporter), 0, exp_interval);
-
     } catch (Exception e) {
       e.printStackTrace();
       throw new HiveException ("Cannot initialize ScriptOperator", e);
@@ -277,6 +278,7 @@
         };
       } catch (IOException e) {
         LOG.error("Got ioexception: " + e.getMessage());
+        e.printStackTrace();
         new_abort = true;
       } catch (InterruptedException e) { }
     }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Thu Jun  4 01:21:30 2009
@@ -25,9 +25,8 @@
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.selectDesc;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
@@ -41,26 +40,34 @@
   transient Object[] output;
   transient ObjectInspector outputObjectInspector;
   
-  boolean firstRow;
-  
-  public void initialize(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
-    super.initialize(hconf, reporter, inputObjInspector);
-
+  public void initializeOp(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {    
+    // Just forward the row as is
+    if (conf.isSelStarNoCompute()) {
+      initializeChildren(hconf, reporter, inputObjInspector);
+      return;
+    }
+    
     ArrayList<exprNodeDesc> colList = conf.getColList();
     eval = new ExprNodeEvaluator[colList.size()];
     for(int i=0; i<colList.size(); i++) {
       assert(colList.get(i) != null);
       eval[i] = ExprNodeEvaluatorFactory.get(colList.get(i));
     }
-    firstRow = true;
+   
+    assert inputObjInspector.length == 1;
+    output = new Object[eval.length];
+    LOG.info("SELECT " + ((StructObjectInspector)inputObjInspector[0]).getTypeName());
+    outputObjectInspector = initEvaluatorsAndReturnStruct(eval, inputObjInspector[0]); 
+    initializeChildren(hconf, reporter, new ObjectInspector[]{outputObjectInspector});
   }
 
   public void process(Object row, ObjectInspector rowInspector, int tag)
       throws HiveException {
-    if (firstRow) {
-      firstRow = false;
-      output = new Object[eval.length];
-      outputObjectInspector = initEvaluatorsAndReturnStruct(eval, rowInspector);
+
+    // Just forward the row as is
+    if (conf.isSelStarNoCompute()) {
+      forward(row, rowInspector);
+      return;
     }
     
     for(int i=0; i<eval.length; i++) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Thu Jun  4 01:21:30 2009
@@ -33,8 +33,8 @@
  **/
 public class TableScanOperator extends Operator<tableScanDesc> implements Serializable {
   private static final long serialVersionUID = 1L;
-  public void initialize(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
-    super.initialize(hconf, reporter, inputObjInspector);
+  public void initializeOp(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
+    initializeChildren(hconf, reporter, inputObjInspector);
     // nothing to do really ..
   }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Thu Jun  4 01:21:30 2009
@@ -32,7 +32,12 @@
  **/
 public class UnionOperator extends  Operator<unionDesc>  implements Serializable {
   private static final long serialVersionUID = 1L;
-
+  
+  @Override
+  public void initializeOp(Configuration hconf, Reporter reporter, ObjectInspector[] inputObjInspector) throws HiveException {
+    initializeChildren(hconf, reporter, inputObjInspector);
+  }
+  
   @Override
   public void process(Object row, ObjectInspector rowInspector, int tag)
       throws HiveException {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java Thu Jun  4 01:21:30 2009
@@ -137,6 +137,8 @@
     pctx.getAliasToSamplePruner().clear();
     pctx.getLoadTableWork().clear();
     pctx.getLoadFileWork().clear();
+    pctx.getJoinContext().clear();
+
     Iterator<Operator<? extends Serializable>> iter = pctx.getOpParseCtx().keySet().iterator();
     while (iter.hasNext()) {
       Operator<? extends Serializable> op = iter.next();
@@ -200,8 +202,9 @@
       SemanticAnalyzer sem = (SemanticAnalyzer)SemanticAnalyzerFactory.get(pGraphContext.getConf(), pGraphContext.getParseTree());
 
       resetParseContext(pGraphContext);
-      sem.init(pGraphContext);
       QB qb = new QB(null, null, false);
+      pGraphContext.setQB(qb);
+      sem.init(pGraphContext);
 
       sem.doPhase1(pGraphContext.getParseTree(), qb, sem.initPhase1Ctx());
       sem.getMetaData(qb);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Thu Jun  4 01:21:30 2009
@@ -19,11 +19,15 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.util.List;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Stack;
 import java.io.Serializable;
 
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRMapJoinCtx;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.plan.mapredWork;
@@ -31,6 +35,8 @@
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.hive.ql.plan.partitionDesc;
 
 /**
  * Processor for the rule - table scan followed by reduce sink
@@ -46,13 +52,17 @@
    * @param opProcCtx context
    */
   public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx, Object... nodeOutputs) throws SemanticException {
+    // Is it the dummy file sink after the mapjoin
+    FileSinkOperator fsOp = (FileSinkOperator)nd;
+    if ((fsOp.getParentOperators().size() == 1) && (fsOp.getParentOperators().get(0) instanceof MapJoinOperator))
+      return null;
+    
     GenMRProcContext ctx = (GenMRProcContext)opProcCtx;
     boolean ret = false;
 
     Task<? extends Serializable> mvTask = ctx.getMvTask();
     Task<? extends Serializable> currTask = ctx.getCurrTask();
     Operator<? extends Serializable> currTopOp = ctx.getCurrTopOp();
-    UnionOperator currUnionOp = ctx.getCurrUnionOp();
     String currAliasId = ctx.getCurrAliasId();
     HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap = ctx.getOpTaskMap();
     List<Operator<? extends Serializable>> seenOps = ctx.getSeenOps();
@@ -82,12 +92,36 @@
         if (ret)
           currTask.removeDependentTask(mvTask);
       }
+
+      return null;
+
     }
-    else if (currUnionOp != null) {
+
+    UnionOperator currUnionOp = ctx.getCurrUnionOp();
+    
+    if  (currUnionOp != null) {
       opTaskMap.put(null, currTask);
-      GenMapRedUtils.initUnionPlan(ctx, currTask);
+      GenMapRedUtils.initUnionPlan(ctx, currTask, false);
+      return null;
     }
+    
+    MapJoinOperator currMapJoinOp = ctx.getCurrMapJoinOp();
+    
+    if  (currMapJoinOp != null) {
+      opTaskMap.put(null, currTask);
+      GenMRMapJoinCtx mjCtx = ctx.getMapJoinCtx(currMapJoinOp);
+      mapredWork plan = (mapredWork) currTask.getWork();
 
+      String taskTmpDir = mjCtx.getTaskTmpDir();
+      tableDesc tt_desc = mjCtx.getTTDesc(); 
+      assert plan.getPathToAliases().get(taskTmpDir) == null;
+      plan.getPathToAliases().put(taskTmpDir, new ArrayList<String>());
+      plan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
+      plan.getPathToPartitionInfo().put(taskTmpDir, new partitionDesc(tt_desc, null));
+      plan.getAliasToWork().put(taskTmpDir, mjCtx.getRootMapJoinOp());
+      return null;
+    }
+    
     return null;
   }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRProcContext.java Thu Jun  4 01:21:30 2009
@@ -27,6 +27,7 @@
 
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -121,8 +122,81 @@
     }
   }
 
+  public static class GenMRMapJoinCtx {
+    String                            taskTmpDir;
+    tableDesc                         tt_desc; 
+    Operator<? extends Serializable>  rootMapJoinOp;
+    MapJoinOperator                   oldMapJoin;   
+    
+    public GenMRMapJoinCtx() { 
+      taskTmpDir    = null;
+      tt_desc       = null;
+      rootMapJoinOp = null;
+      oldMapJoin    = null;
+    }
+
+    /**
+     * @param taskTmpDir
+     * @param tt_desc
+     * @param childSelect
+     * @param oldMapJoin
+     */
+    public GenMRMapJoinCtx(String taskTmpDir, tableDesc tt_desc, 
+        Operator<? extends Serializable> rootMapJoinOp, MapJoinOperator oldMapJoin) {
+      this.taskTmpDir    = taskTmpDir;
+      this.tt_desc       = tt_desc;
+      this.rootMapJoinOp = rootMapJoinOp;
+      this.oldMapJoin    = oldMapJoin;
+    }
+    
+    public void setTaskTmpDir(String taskTmpDir) {
+      this.taskTmpDir = taskTmpDir;
+    }
+
+    public String getTaskTmpDir() {
+      return taskTmpDir;
+    }
+
+    public void setTTDesc(tableDesc tt_desc) {
+      this.tt_desc = tt_desc;
+    }
+
+    public tableDesc getTTDesc() {
+      return tt_desc;
+    }
+
+    /**
+     * @return the childSelect
+     */
+    public Operator<? extends Serializable> getRootMapJoinOp() {
+      return rootMapJoinOp;
+    }
+
+    /**
+     * @param rootMapJoinOp the rootMapJoinOp to set
+     */
+    public void setRootMapJoinOp(Operator<? extends Serializable> rootMapJoinOp) {
+      this.rootMapJoinOp = rootMapJoinOp;
+    }
+
+    /**
+     * @return the oldMapJoin
+     */
+    public MapJoinOperator getOldMapJoin() {
+      return oldMapJoin;
+    }
+
+    /**
+     * @param oldMapJoin the oldMapJoin to set
+     */
+    public void setOldMapJoin(MapJoinOperator oldMapJoin) {
+      this.oldMapJoin = oldMapJoin;
+    }
+  }
+
   private HashMap<Operator<? extends Serializable>, Task<? extends Serializable>> opTaskMap;
-  private HashMap<UnionOperator, GenMRUnionCtx>  unionTaskMap;
+  private HashMap<UnionOperator, GenMRUnionCtx>      unionTaskMap;
+  private HashMap<MapJoinOperator, GenMRMapJoinCtx>  mapJoinTaskMap;
   private List<Operator<? extends Serializable>> seenOps;
 
   private ParseContext                          parseCtx;
@@ -133,6 +207,7 @@
   private Task<? extends Serializable>         currTask;
   private Operator<? extends Serializable>     currTopOp;
   private UnionOperator                        currUnionOp;
+  private MapJoinOperator                      currMapJoinOp;
   private String                               currAliasId;
   private List<Operator<? extends Serializable>> rootOps;
   
@@ -179,10 +254,12 @@
     currTask        = null;
     currTopOp       = null;
     currUnionOp     = null;
+    currMapJoinOp   = null;
     currAliasId     = null;
     rootOps         = new ArrayList<Operator<? extends Serializable>>();
     rootOps.addAll(parseCtx.getTopOps().values());
     unionTaskMap = new HashMap<UnionOperator, GenMRUnionCtx>();
+    mapJoinTaskMap = new HashMap<MapJoinOperator, GenMRMapJoinCtx>();
   }
 
   /**
@@ -322,6 +399,17 @@
     this.currUnionOp = currUnionOp;
   }      
 
+  public MapJoinOperator getCurrMapJoinOp() {
+    return currMapJoinOp;
+  }   
+   
+  /**
+   * @param currMapJoinOp current map join operator
+   */
+  public void setCurrMapJoinOp(MapJoinOperator currMapJoinOp) {
+    this.currMapJoinOp = currMapJoinOp;
+  }      
+
   /**
    * @return current top alias
    */
@@ -343,7 +431,15 @@
   public void setUnionTask(UnionOperator op, GenMRUnionCtx uTask) {
     unionTaskMap.put(op, uTask);
   }
-  
+
+  public GenMRMapJoinCtx getMapJoinCtx(MapJoinOperator op) {
+    return mapJoinTaskMap.get(op);
+  }
+
+  public void setMapJoinCtx(MapJoinOperator op, GenMRMapJoinCtx mjCtx) {
+    mapJoinTaskMap.put(op, mjCtx);
+  }
+
   /**
    * Get the input set.
    */

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java Thu Jun  4 01:21:30 2009
@@ -74,7 +74,7 @@
     // This will happen in case of joins. The current plan can be thrown away after being merged with the
     // original plan
     else {
-      GenMapRedUtils.joinPlan(op, null, opMapTask, ctx);
+      GenMapRedUtils.joinPlan(op, null, opMapTask, ctx, -1, false, false, false);
       currTask = opMapTask;
       ctx.setCurrTask(currTask);
     }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink2.java Thu Jun  4 01:21:30 2009
@@ -64,7 +64,7 @@
     if (opMapTask == null)
       GenMapRedUtils.splitPlan(op, ctx);
     else {
-      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx);
+      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true, false, false);
       currTask = opMapTask;
       ctx.setCurrTask(currTask);
     }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java?rev=781633&r1=781632&r2=781633&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java Thu Jun  4 01:21:30 2009
@@ -40,7 +40,7 @@
 import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
 
 /**
- * Processor for the rule - table scan followed by reduce sink
+ * Processor for the rule - union followed by reduce sink
  */
 public class GenMRRedSink3 implements NodeProcessor {
 
@@ -90,8 +90,8 @@
     // There is a join after union. One of the branches of union has already been initialized.
     // Initialize the current branch, and join with the original plan.
     else {
-      GenMapRedUtils.initUnionPlan(ctx, currTask);
-      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx);
+      GenMapRedUtils.initUnionPlan(ctx, currTask, false);
+      GenMapRedUtils.joinPlan(op, currTask, opMapTask, ctx, -1, true, false, false);
     }
 
     mapCurrCtx.put(op, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrTopOp(), ctx.getCurrAliasId()));



Mime
View raw message