pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pradeep...@apache.org
Subject svn commit: r804309 [2/2] - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ src/org/apache/pig/backend/hadoop/executionengine/physicalLay...
Date Fri, 14 Aug 2009 17:53:24 GMT
Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=804309&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Fri Aug 14 17:53:23 2009
@@ -0,0 +1,715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+public class POMergeJoin extends PhysicalOperator {
+
+    /** This operator implements merge join algorithm to do map side joins. 
+     *  Currently, only two-way joins are supported. One input of join is identified as left
+     *  and other is identified as right. Left input tuples are the input records in map.
+     *  Right tuples are read from HDFS by opening right stream.
+     *  
+     *    This join doesn't support outer join.
+     *    Data is assumed to be sorted in ascending order. It will fail if data is sorted in descending order.
+     */
+    private static final long serialVersionUID = 1L;
+
+    private final transient Log log = LogFactory.getLog(getClass());
+
+    // flag to indicate when getNext() is called first.
+    private boolean firstTime = true;
+
+    //The Local Rearrange operators modeling the join key
+    private POLocalRearrange[] LRs;
+
+    // FileSpec of index file which will be read from HDFS.
+    private FileSpec indexFile;
+
+    private POLoad rightLoader;
+
+    private OperatorKey opKey;
+
+    private Object prevLeftKey;
+
+    private Result prevLeftInp;
+
+    private Object prevRightKey = null;
+
+    private Result prevRightInp;
+
+    private transient TupleFactory mTupleFactory;
+
+    //boolean denoting whether we are generating joined tuples in this getNext() call or do we need to read in more data.
+    private boolean doingJoin;
+
+    // Index is modeled as FIFO queue and LinkedList implements java Queue interface.  
+    private LinkedList<Tuple> index;
+
+    private FuncSpec rightLoaderFuncSpec;
+
+    // Buffer to hold accumulated left tuples.
+    private List<Tuple> leftTuples;
+
+    private MultiMap<PhysicalOperator, PhysicalPlan> inpPlans;
+
+    private PhysicalOperator rightPipelineLeaf;
+
+    private PhysicalOperator rightPipelineRoot;
+
+    private boolean noInnerPlanOnRightSide;
+
+    private PigContext pc;
+
+    private Object curJoinKey;
+
+    private Tuple curJoiningRightTup;
+
+    private int counter; // # of tuples on left side with same key.
+
+    private int leftTupSize = -1;
+
+    private int rightTupSize = -1;
+
+    private int arrayListSize = 1024;
+
+    private List<POCast> casters;
+
+    private List<POProject> projectors;
+
+    /**
+     * @param k
+     * @param rp
+     * @param inp
+     * @param inpPlans there can only be 2 inputs each being a List<PhysicalPlan>
+     * Ex. join A by ($0,$1), B by ($1,$2);
+     */
+    public POMergeJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, MultiMap<PhysicalOperator, PhysicalPlan> inpPlans,List<List<Byte>> keyTypes) throws ExecException{
+
+        super(k, rp, inp);
+        this.opKey = k;
+        this.doingJoin = false;
+        this.inpPlans = inpPlans;
+        LRs = new POLocalRearrange[2];
+        mTupleFactory = TupleFactory.getInstance();
+        leftTuples = new ArrayList<Tuple>(arrayListSize);
+        this.createJoinPlans(inpPlans,keyTypes);
+        setUpTypeCastingForIdxTup(keyTypes.get(0));
+    }
+
+    /** This function setups casting for key tuples which we read out of index file.
+     * We set the type of key as DataByteArray(DBA) and then cast it into the type specified in schema.
+     * If type is not specified in schema, then we will cast from DBA to DBA.
+     */
+    
+    private void setUpTypeCastingForIdxTup(List<Byte> keyTypes){
+         /*   
+         * Cant reuse one POCast operator for all keys since POCast maintains some state
+         * and hence its not safe to use one POCast. Thus we use one POCast for each key.
+         */
+        casters = new ArrayList<POCast>(keyTypes.size());
+        projectors = new ArrayList<POProject>(keyTypes.size());
+
+        for(Byte keytype : keyTypes){
+            POCast caster = new POCast(genKey());
+            List<PhysicalOperator> pp = new ArrayList<PhysicalOperator>(1);
+            POProject projector = new POProject(genKey());
+            projector.setResultType(DataType.BYTEARRAY);
+            projector.setColumn(0);
+            pp.add(projector);
+            caster.setInputs(pp);
+            caster.setResultType(keytype);
+            projectors.add(projector);
+            casters.add(caster);
+        }
+    }
+    /**
+     * Configures the Local Rearrange operators to get keys out of tuple.
+     * @throws ExecException 
+     */
+    private void createJoinPlans(MultiMap<PhysicalOperator, PhysicalPlan> inpPlans, List<List<Byte>> keyTypes) throws ExecException{
+
+        int i=-1;
+        for (PhysicalOperator inpPhyOp : inpPlans.keySet()) {
+            ++i;
+            POLocalRearrange lr = new POLocalRearrange(genKey());
+            lr.setIndex(i);
+            lr.setResultType(DataType.TUPLE);
+            lr.setKeyType(keyTypes.get(i).size() > 1 ? DataType.TUPLE : keyTypes.get(i).get(0));
+            try {
+                lr.setPlans((List<PhysicalPlan>)inpPlans.get(inpPhyOp));
+            } catch (PlanException pe) {
+                int errCode = 2071;
+                String msg = "Problem with setting up local rearrange's plans.";
+                throw new ExecException(msg, errCode, PigException.BUG, pe);
+            }
+            LRs[i]= lr;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+
+        Object curLeftKey;
+        Result curLeftInp;
+
+        if(firstTime){
+            // Do initial setup.
+            curLeftInp = processInput();
+            if(curLeftInp.returnStatus != POStatus.STATUS_OK)
+                return curLeftInp;       // Return because we want to fetch next left tuple.
+
+            curLeftKey = extractKeysFromTuple(curLeftInp, 0);
+            if(null == curLeftKey) // We drop the tuples which have null keys.
+                return new Result(POStatus.STATUS_EOP, null);
+            
+            seekInRightStream(curLeftKey);
+            leftTuples.add((Tuple)curLeftInp.result);
+            firstTime = false;
+            prevLeftKey = curLeftKey;
+            return new Result(POStatus.STATUS_EOP, null);
+        }
+
+        if(doingJoin){
+            // We matched on keys. Time to do the join.
+
+            if(counter > 0){    // We have left tuples to join with current right tuple.
+                Tuple joiningLeftTup = leftTuples.get(--counter);
+                leftTupSize = joiningLeftTup.size();
+                Tuple joinedTup = mTupleFactory.newTuple(leftTupSize+rightTupSize);
+
+                for(int i=0; i<leftTupSize; i++)
+                    joinedTup.set(i, joiningLeftTup.get(i));
+
+                for(int i=0; i < rightTupSize; i++)
+                    joinedTup.set(i+leftTupSize, curJoiningRightTup.get(i));
+
+                return new Result(POStatus.STATUS_OK, joinedTup);
+            }
+            // Join with current right input has ended. But bag of left tuples
+            // may still join with next right tuple.
+
+            doingJoin = false;
+
+            while(true){
+                Result rightInp = getNextRightInp();
+                if(rightInp.returnStatus != POStatus.STATUS_OK){
+                    prevRightInp = null;
+                    return rightInp;
+                }
+                else{
+                    Object rightKey = extractKeysFromTuple(rightInp, 1);
+                    if(null == rightKey) // If we see tuple having null keys in stream, we drop them 
+                        continue;       // and fetch next tuple.
+
+                    int cmpval = ((Comparable)rightKey).compareTo(curJoinKey);
+                    if (cmpval == 0){
+                        // Matched the very next right tuple.
+                        curJoiningRightTup = (Tuple)rightInp.result;
+                        rightTupSize = curJoiningRightTup.size();
+                        counter = leftTuples.size();
+                        doingJoin = true;
+                        return this.getNext(dummyTuple);
+
+                    }
+                    else if(cmpval > 0){    // We got ahead on right side. Store currently read right tuple.
+                        if(!this.parentPlan.endOfAllInput){
+                            prevRightKey = rightKey;
+                            prevRightInp = rightInp;
+                            // There cant be any more join on this key.
+                            leftTuples = new ArrayList<Tuple>(arrayListSize);
+                            leftTuples.add((Tuple)prevLeftInp.result);
+                        }
+
+                        else{           // This is end of all input and this is last join output.
+                            // Right loader in this case wouldn't get a chance to close input stream. So, we close it ourself.
+                            try {
+                                rightLoader.tearDown();
+                            } catch (IOException e) {
+                                // Non-fatal error. We can continue.
+                                log.error("Received exception while trying to close right side file: " + e.getMessage());
+                            }
+                        }
+                        return new Result(POStatus.STATUS_EOP, null);
+                    }
+                    else{   // At this point right side can't be behind.
+                        int errCode = 1102;
+                        String errMsg = "Data is not sorted on right side. Last two tuples encountered were: \n"+
+                        curJoiningRightTup+ "\n" + (Tuple)rightInp.result ;
+                        throw new ExecException(errMsg,errCode);
+                    }    
+                }
+            }
+        }
+
+        curLeftInp = processInput();
+        switch(curLeftInp.returnStatus){
+
+        case POStatus.STATUS_OK:
+            curLeftKey = extractKeysFromTuple(curLeftInp, 0);
+            if(null == curLeftKey) // We drop the tuples which have null keys.
+                return new Result(POStatus.STATUS_EOP, null);
+            
+            int cmpVal = ((Comparable)curLeftKey).compareTo(prevLeftKey);
+            if(cmpVal == 0){
+                // Keep on accumulating.
+                leftTuples.add((Tuple)curLeftInp.result);
+                return new Result(POStatus.STATUS_EOP, null);
+            }
+            else if(cmpVal > 0){ // Filled with left bag. Move on.
+                curJoinKey = prevLeftKey;
+                break;   
+            }
+            else{   // Current key < Prev Key
+                int errCode = 1102;
+                String errMsg = "Data is not sorted on left side. Last two keys encountered were: \n"+
+                prevLeftKey+ "\n" + curLeftKey ;
+                throw new ExecException(errMsg,errCode);
+            }
+ 
+        case POStatus.STATUS_EOP:
+            if(this.parentPlan.endOfAllInput){
+                // We hit the end on left input. 
+                // Tuples in bag may still possibly join with right side.
+                curJoinKey = prevLeftKey;
+                curLeftKey = null;
+                break;                
+            }
+            else    // Fetch next left input.
+                return curLeftInp;
+
+        default:    // If encountered with ERR / NULL on left side, we send it down.
+            return curLeftInp;
+        }
+
+        if((null != prevRightKey) && !this.parentPlan.endOfAllInput && ((Comparable)prevRightKey).compareTo(curLeftKey) >= 0){
+
+            // This will happen when we accumulated inputs on left side and moved on, but are still behind the right side
+            // In that case, throw away the tuples accumulated till now and add the one we read in this function call.
+            leftTuples = new ArrayList<Tuple>(arrayListSize);
+            leftTuples.add((Tuple)curLeftInp.result);
+            prevLeftInp = curLeftInp;
+            prevLeftKey = curLeftKey;
+            return new Result(POStatus.STATUS_EOP, null);
+        }
+
+        // Accumulated tuples with same key on left side.
+        // But since we are reading ahead we still haven't checked the read ahead right tuple.
+        // Accumulated left tuples may potentially join with that. So, lets check that first.
+        
+        if((null != prevRightKey) && prevRightKey.equals(prevLeftKey)){
+
+            curJoiningRightTup = (Tuple)prevRightInp.result;
+            counter = leftTuples.size();
+            rightTupSize = curJoiningRightTup.size();
+            doingJoin = true;
+            prevLeftInp = curLeftInp;
+            prevLeftKey = curLeftKey;
+            return this.getNext(dummyTuple);
+        }
+
+        // We will get here only when curLeftKey > prevRightKey
+        while(true){
+            // Start moving on right stream to find the tuple whose key is same as with current left bag key.
+            Result rightInp = getNextRightInp();
+            if(rightInp.returnStatus != POStatus.STATUS_OK)
+                return rightInp;
+
+            Object extractedRightKey = extractKeysFromTuple(rightInp, 1);
+            
+            if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them 
+                continue;       // and fetch next tuple.
+            
+            Comparable rightKey = (Comparable)extractedRightKey;
+            
+            if( prevRightKey != null && rightKey.compareTo(prevRightKey) < 0){
+                // Sanity check.
+                int errCode = 1102;
+                String errMsg = "Data is not sorted on right side. Last two keys encountered were: \n"+
+                prevRightKey+ "\n" + rightKey ;
+                throw new ExecException(errMsg,errCode);
+            }
+
+            int cmpval = rightKey.compareTo(prevLeftKey);
+            if(cmpval < 0)     // still behind the left side, do nothing, fetch next right tuple.
+                continue;
+
+            else if (cmpval == 0){  // Found matching tuple. Time to do join.
+
+                curJoiningRightTup = (Tuple)rightInp.result;
+                counter = leftTuples.size();
+                rightTupSize = curJoiningRightTup.size();
+                doingJoin = true;
+                prevLeftInp = curLeftInp;
+                prevLeftKey = curLeftKey;
+                return this.getNext(dummyTuple);
+            }
+
+            else{    // We got ahead on right side. Store currently read right tuple.
+                prevRightKey = rightKey;
+                prevRightInp = rightInp;
+                // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call. 
+                leftTuples = new ArrayList<Tuple>(arrayListSize);
+                leftTuples.add((Tuple)curLeftInp.result);
+                prevLeftInp = curLeftInp;
+                prevLeftKey = curLeftKey;
+                if(this.parentPlan.endOfAllInput){  // This is end of all input and this is last time we will read right input.
+                    // Right loader in this case wouldn't get a chance to close input stream. So, we close it ourself.
+                    try {
+                        rightLoader.tearDown();
+                    } catch (IOException e) {
+                     // Non-fatal error. We can continue.
+                        log.error("Received exception while trying to close right side file: " + e.getMessage());
+                    }
+                }
+                return new Result(POStatus.STATUS_EOP, null);
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void seekInRightStream(Object firstLeftKey) throws ExecException{
+
+        /* Currently whole of index is read into memory. Typically, index is small. Usually 
+           few KBs in size. So, this should not be an issue.
+           However, reading whole index at startup time is not required. So, this can be improved upon.
+           Assumption: Index being read is sorted on keys followed by filename, followed by offset.
+         */
+
+        // Index is modeled as FIFO Queue, that frees us from keeping track of which index entry should be read next.
+        POLoad ld = new POLoad(genKey(), this.indexFile, false);
+        try {
+            pc = (PigContext)ObjectSerializer.deserialize(PigMapReduce.sJobConf.get("pig.pigContext"));
+        } catch (IOException e) {
+            int errCode = 2094;
+            String msg = "Unable to deserialize pig context.";
+            throw new ExecException(msg,errCode,e);
+        }
+        pc.connect();
+        ld.setPc(pc);
+        index = new LinkedList<Tuple>();
+        for(Result res=ld.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(dummyTuple))
+            index.offer((Tuple) res.result);   
+
+        Tuple prevIdxEntry = null;
+        Tuple matchedEntry;
+     
+        // When the first call is made, we need to seek into right input at correct offset.
+        while(true){
+            // Keep looping till we find first entry in index >= left key
+            // then return the prev idx entry.
+
+            Tuple curIdxEntry = index.poll();
+            if(null == curIdxEntry){
+                // Its possible that we hit end of index and still doesn't encounter
+                // idx entry >= left key, in that case return last index entry.
+                matchedEntry = prevIdxEntry;
+                break;
+            }
+            Object extractedKey = extractKeysFromIdxTuple(curIdxEntry);
+            if(extractedKey == null){
+                prevIdxEntry = curIdxEntry;
+                continue;
+            }
+            
+            if(((Comparable)extractedKey).compareTo(firstLeftKey) >= 0){
+                if(null == prevIdxEntry)   // very first entry in index.
+                    matchedEntry = curIdxEntry;
+                else{
+                    matchedEntry = prevIdxEntry;
+                    index.addFirst(curIdxEntry);  // We need to add back the current index Entry because we are reading ahead.
+                }
+                break;
+            }
+            else
+                prevIdxEntry = curIdxEntry;
+        }
+
+        if(matchedEntry == null){
+            
+            int errCode = 2165;
+            String errMsg = "Problem in index construction.";
+            throw new ExecException(errMsg,errCode,PigException.BUG);
+        }
+        
+        Object extractedKey = extractKeysFromIdxTuple(matchedEntry);
+        
+        if(extractedKey != null){
+            Class idxKeyClass = extractedKey.getClass();
+            if( ! firstLeftKey.getClass().equals(idxKeyClass)){
+
+                // This check should indeed be done on compile time. But to be on safe side, we do it on runtime also.
+                int errCode = 2166;
+                String errMsg = "Key type mismatch. Found key of type "+firstLeftKey.getClass().getCanonicalName()+" on left side. But, found key of type "+ idxKeyClass.getCanonicalName()+" in index built for right side.";
+                throw new ExecException(errMsg,errCode,PigException.BUG);
+            }
+        }
+        initRightLoader(matchedEntry);
+    }
+
+    /**innerKeyTypes
+     * @param indexFile the indexFile to set
+     */
+    public void setIndexFile(FileSpec indexFile) {
+        this.indexFile = indexFile;
+    }
+
+    private Object extractKeysFromIdxTuple(Tuple idxTuple) throws ExecException{
+
+        int idxTupSize = idxTuple.size();
+        List<Object> list = new ArrayList<Object>(idxTupSize-2);
+
+        for(int i=0; i<idxTupSize-2; i++){
+
+            projectors.get(i).attachInput(mTupleFactory.newTuple(idxTuple.get(i)));
+
+            switch (casters.get(i).getResultType()) {
+
+            case DataType.BYTEARRAY:    // POCast doesn't handle DBA. But we are saved, because in this case we don't need cast anyway.
+                list.add(idxTuple.get(i));
+                break;
+
+            case DataType.CHARARRAY:
+                list.add(casters.get(i).getNext(dummyString).result);
+                break;
+
+            case DataType.INTEGER:
+                list.add(casters.get(i).getNext(dummyInt).result);
+                break;
+
+            case DataType.FLOAT:
+                list.add(casters.get(i).getNext(dummyFloat).result);
+                break;
+
+            case DataType.DOUBLE:
+                list.add(casters.get(i).getNext(dummyDouble).result);
+                break;
+
+            case DataType.LONG:
+                list.add(casters.get(i).getNext(dummyLong).result);
+                break;
+
+            case DataType.TUPLE:
+                list.add(casters.get(i).getNext(dummyTuple).result);
+                break;
+
+            default:
+                int errCode = 2036;
+                String errMsg = "Unhandled key type : "+casters.get(i).getResultType();
+                throw new ExecException(errMsg,errCode,PigException.BUG);
+            }
+        }
+        // If there is only one key, we don't want to wrap it into Tuple.
+        return list.size() == 1 ? list.get(0) : mTupleFactory.newTuple(list);
+    }
+
+    private Result getNextRightInp() throws ExecException{
+
+        if(noInnerPlanOnRightSide){
+            Result res = rightLoader.getNext(dummyTuple);
+            switch(res.returnStatus) {
+            case POStatus.STATUS_OK:
+                return res;
+
+            case POStatus.STATUS_EOP:           // Current file has ended. Need to open next file by reading next index entry.
+                String prevFile = rightLoader.getLFile().getFileName();
+                while(true){                        // But next file may be same as previous one, because index may contain multiple entries for same file.
+                    Tuple idxEntry = index.poll();
+                    if(null == idxEntry)            // Index is finished too. Right stream is finished. No more tuples.
+                        return res;
+                    else{                           
+                        if(prevFile.equals((String)idxEntry.get(idxEntry.size()-2)))
+                            continue;
+                        else{
+                            initRightLoader(idxEntry);      // bind loader to file and get tuple from it.
+                            return this.getNextRightInp();    
+                        }
+                    }
+                }
+            default:    // We pass down ERR/NULL.
+                return res;
+            }
+        }
+
+        else {
+            Result res = rightPipelineLeaf.getNext(dummyTuple);
+            switch(res.returnStatus){
+            case POStatus.STATUS_OK:
+                return res;
+
+            case POStatus.STATUS_EOP:
+                res = rightLoader.getNext(dummyTuple);
+
+                switch(res.returnStatus) {
+                case POStatus.STATUS_OK:
+                    rightPipelineRoot.attachInput((Tuple)res.result);
+                    return this.getNextRightInp();
+
+                case POStatus.STATUS_EOP:          // Current file has ended. Need to open next file by reading next index entry.
+                    String prevFile = rightLoader.getLFile().getFileName();
+                    while(true){                        // But next file may be same as previous one, because index may contain multiple entries for same file.
+                        Tuple idxEntry = index.poll();
+                        if(null == idxEntry)          // Index is finished too. Right stream is finished. No more tuples.
+                            return res;
+                        else{
+                            if(prevFile.equals((String)idxEntry.get(idxEntry.size()-2)))
+                                continue;
+                            else{
+                                initRightLoader(idxEntry);
+                                res = rightLoader.getNext(dummyTuple);
+                                rightPipelineRoot.attachInput((Tuple)res.result);
+                                return this.getNextRightInp();
+                            }
+                        }
+                    }
+                default:    // We don't deal with ERR/NULL. just pass them down
+                    return res;
+                }
+
+            default:    // We don't deal with ERR/NULL. just pass them down
+                return res;
+            }            
+        }
+    }
+
+    private void initRightLoader(Tuple idxEntry) throws ExecException{
+
+        // bind loader to file pointed by this index Entry.
+        int keysCnt = idxEntry.size();
+        rightLoader = new POLoad(genKey(), new FileSpec((String)idxEntry.get(keysCnt-2),this.rightLoaderFuncSpec),(Long)idxEntry.get(keysCnt-1), false);
+        rightLoader.setPc(pc);
+    }
+
+    private Object extractKeysFromTuple(Result inp, int lrIdx) throws ExecException{
+
+        //Separate Key & Value of input using corresponding LR operator
+        POLocalRearrange lr = LRs[lrIdx];
+        lr.attachInput((Tuple)inp.result);
+        Result lrOut = lr.getNext(dummyTuple);
+        if(lrOut.returnStatus!=POStatus.STATUS_OK){
+            int errCode = 2167;
+            String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly";
+            throw new ExecException(errMsg,errCode,PigException.BUG);
+        } 
+          
+        return ((Tuple) lrOut.result).get(1);
+    }
+
+    public void setupRightPipeline(PhysicalPlan rightPipeline) throws FrontendException{
+
+        if(rightPipeline != null){
+            if(rightPipeline.getLeaves().size() != 1 || rightPipeline.getRoots().size() != 1){
+                int errCode = 2168;
+                String errMsg = "Expected physical plan with exactly one root and one leaf.";
+                throw new FrontendException(errMsg,errCode,PigException.BUG);
+            }
+
+            noInnerPlanOnRightSide = false;
+            this.rightPipelineLeaf = rightPipeline.getLeaves().get(0);
+            this.rightPipelineRoot = rightPipeline.getRoots().get(0);
+            this.rightPipelineRoot.setInputs(null);            
+        }
+        else
+            noInnerPlanOnRightSide = true;
+    }
+
+    private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException, ExecException{
+
+        is.defaultReadObject();
+        mTupleFactory = TupleFactory.getInstance();
+    }
+
+
+    private OperatorKey genKey(){
+        return new OperatorKey(opKey.scope,NodeIdGenerator.getGenerator().getNextNodeId(opKey.scope));
+    }
+
+    public void setRightLoaderFuncSpec(FuncSpec rightLoaderFuncSpec) {
+        this.rightLoaderFuncSpec = rightLoaderFuncSpec;
+        for(POCast caster : casters)
+            caster.setLoadFSpec(rightLoaderFuncSpec);            
+    }
+
+    public List<PhysicalPlan> getInnerPlansOf(int index) {
+        return (List<PhysicalPlan>)inpPlans.get(inputs.get(index));
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#visit(org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor)
+     */
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitMergeJoin(this);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.plan.Operator#name()
+     */
+    @Override
+    public String name() {
+        return "MergeJoin[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.plan.Operator#supportsMultipleInputs()
+     */
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.plan.Operator#supportsMultipleOutputs()
+     */
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java?rev=804309&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/MergeJoinIndexer.java Fri Aug 14 17:53:23 2009
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.List;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+public class MergeJoinIndexer extends RandomSampleLoader {
+
+    /** Merge Join indexer is used to generate on the fly index for doing Merge Join efficiently.
+     *  It samples first record from every block of right side input (which is later opened as side file in merge join)
+     *  and returns tuple in the following format : 
+     *  (key0, key1,...,fileName, offset)
+     *  These tuples are then sorted before being written out to index file on HDFS.
+     */
+    
+    private boolean firstRec = true;
+    private transient TupleFactory mTupleFactory;
+    private String fileName;
+    private POLocalRearrange lr;
+    private int keysCnt;
+    
+    /** @param funcSpec : Loader specification.
+     *  @param serializedPlans : This is serialized version of LR plan. We 
+     *  want to keep only keys in our index file and not the whole tuple. So, we need LR and thus its plan
+     *  to get keys out of the sampled tuple.  
+     */
+    @SuppressWarnings("unchecked")
+    public MergeJoinIndexer(String funcSpec, String serializedPlans) throws ExecException{
+        super(funcSpec,"1");
+
+        try {
+            List<PhysicalPlan> innerPlans = (List<PhysicalPlan>)ObjectSerializer.deserialize(serializedPlans);
+            lr = new POLocalRearrange(new OperatorKey("MergeJoin Indexer",NodeIdGenerator.getGenerator().getNextNodeId("MergeJoin Indexer")));
+            lr.setPlans(innerPlans);
+            keysCnt = innerPlans.size();
+            mTupleFactory = TupleFactory.getInstance();
+        }
+        catch (PlanException pe) {
+            int errCode = 2071;
+            String msg = "Problem with setting up local rearrange's plans.";
+            throw new ExecException(msg, errCode, PigException.BUG, pe);
+        }
+        catch (IOException e) {
+            int errCode = 2094;
+            String msg = "Unable to deserialize inner plans in Indexer.";
+            throw new ExecException(msg,errCode,e);
+        }
+    }
+
+    @Override
+    public void bindTo(String fileName, BufferedPositionedInputStream is,long offset, long end) throws IOException {
+        this.fileName = fileName;
+        super.bindTo(fileName, is, offset, end);
+    }
+
+    @Override
+    public Tuple getNext() throws IOException {
+
+        if(!firstRec)   // We sample only record per block.
+            return null;
+
+        while(true){
+            long initialPos = loader.getPosition();
+            Tuple t = loader.getSampledTuple();
+            
+            if(null == t){          // We hit the end of block because all keys are null. 
+            
+                Tuple wrapperTuple = mTupleFactory.newTuple(keysCnt+2);
+                for(int i =0; i < keysCnt; i++)
+                    wrapperTuple.set(i, null);
+                wrapperTuple.set(keysCnt, fileName);
+                wrapperTuple.set(keysCnt+1, initialPos);
+                firstRec = false;
+                return wrapperTuple;
+            }
+                
+            Tuple dummyTuple = null;
+            lr.attachInput(t);
+            Object key = ((Tuple)lr.getNext(dummyTuple).result).get(1);
+            if(null == key)     // Tuple with null key. Drop it. Get next.
+                continue;
+            
+            Tuple wrapperTuple = mTupleFactory.newTuple(keysCnt+2);        
+            if(key instanceof Tuple){
+                Tuple tupKey = (Tuple)key;
+                for(int i =0; i < tupKey.size(); i++)
+                    wrapperTuple.set(i, tupKey.get(i));
+            }
+
+            else
+                wrapperTuple.set(0, key);
+
+            lr.detachInput();
+            wrapperTuple.set(keysCnt, fileName);
+            wrapperTuple.set(keysCnt+1, initialPos);
+
+            firstRec = false;
+            return wrapperTuple;
+        }
+    }
+
+    private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException, ExecException{
+        is.defaultReadObject();
+        mTupleFactory = TupleFactory.getInstance();
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/RandomSampleLoader.java Fri Aug 14 17:53:23 2009
@@ -42,7 +42,7 @@
     private int numSamples;
     private long skipInterval;    
 	private TupleFactory factory;
-    private SamplableLoader loader;
+    protected SamplableLoader loader;
     
     /**
      * Construct with a class of loader to use.

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOJoin.java Fri Aug 14 17:53:23 2009
@@ -51,7 +51,8 @@
 	public static enum JOINTYPE {
         REGULAR, // Regular join
         REPLICATED, // Fragment Replicated join
-        SKEWED // Skewed Join
+        SKEWED, // Skewed Join
+        MERGE   // Sort Merge Join
     };
 
     /**
@@ -74,7 +75,7 @@
      * @param joinPlans
      *            the join columns
      * @param jt
-     *            indicates the type of join - regular, skewed or fragment replicated
+     *            indicates the type of join - regular, skewed fragment replicated or merge join
      */
     public LOJoin(
             LogicalPlan plan,

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri Aug 14 17:53:23 2009
@@ -68,6 +68,7 @@
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.hadoop.fs.Path;
+import org.apache.pig.impl.util.LinkedMultiMap;
 
 public class QueryParser {
 	private PigContext pigContext;
@@ -299,24 +300,32 @@
 	}
 	
 	/**
-	 * Join parser. Currently can only handle skewed joins. 
+	 * Join parser. 
 	 */
 	LogicalOperator parseJoin(ArrayList<CogroupInput> gis, LogicalPlan lp, LOJoin.JOINTYPE jt) throws ParseException, PlanException{
 		log.trace("Entering parseJoin");
 		// Skewed Join behaves as regular join in local mode
 		if (pigContext.getExecType() == ExecType.LOCAL && jt == LOJoin.JOINTYPE.SKEWED) {
 			return rewriteJoin(gis,lp);
-		} 
-		
+		}
+
+      // Merge Join behaves as regular join in local mode
+		if (pigContext.getExecType() == ExecType.LOCAL && jt == LOJoin.JOINTYPE.MERGE) {
+            return rewriteJoin(gis,lp);
+        }
+        
 		int n = gis.size();
 
 		if (jt == LOJoin.JOINTYPE.SKEWED && n != 2) {
 			throw new ParseException("Skewed join can only be applied for 2-way joins");
 		}
-
+		if (jt == LOJoin.JOINTYPE.MERGE && n != 2) {
+            throw new ParseException("Merge join can only be applied for 2-way joins");
+        }
+        
 		ArrayList<LogicalOperator> los = new ArrayList<LogicalOperator>();
 		ArrayList<ArrayList<LogicalPlan>> plans = new ArrayList<ArrayList<LogicalPlan>>();
-		MultiMap<LogicalOperator, LogicalPlan> joinPlans = new MultiMap<LogicalOperator, LogicalPlan>();
+		MultiMap<LogicalOperator, LogicalPlan> joinPlans = new LinkedMultiMap<LogicalOperator, LogicalPlan>();
 		boolean[] isInner = new boolean[n];
 		
 		int arity = gis.get(0).plans.size();
@@ -356,61 +365,6 @@
 	}
 
 	/**
-	 * Mimicing parseCogroup as the parsing logic for FRJoin remains exactly the same.
-	 */
-	 /*
-	LogicalOperator parseFRJoin(ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
-		
-		log.trace("Entering parseCogroup");
-		log.debug("LogicalPlan: " + lp);
-		
-		int n = gis.size();
-		log.debug("Number of cogroup inputs = " + n);
-		
-		ArrayList<LogicalOperator> los = new ArrayList<LogicalOperator>();
-		ArrayList<ArrayList<LogicalPlan>> plans = new ArrayList<ArrayList<LogicalPlan>>();
-		MultiMap<LogicalOperator, LogicalPlan> groupByPlans = new MultiMap<LogicalOperator, LogicalPlan>();
-		//Map<LogicalOperator, LogicalPlan> groupByPlans = new HashMap<LogicalOperator, LogicalPlan>();
-		boolean[] isInner = new boolean[n];
-		
-		int arity = gis.get(0).plans.size();
-		
-		for (int i = 0; i < n ; i++){
-			
-			CogroupInput gi = gis.get(i);
-			los.add(gi.op);
-			ArrayList<LogicalPlan> planList = gi.plans;
-			plans.add(gi.plans);
-			int numGrpByOps = planList.size();
-			log.debug("Number of group by operators = " + numGrpByOps);
-
-			if(arity != numGrpByOps) {
-				throw new ParseException("The arity of the group by columns do not match.");
-			}
-			for(int j = 0; j < numGrpByOps; ++j) {
-			    groupByPlans.put(gi.op, planList.get(j));
-				for(LogicalOperator root: planList.get(j).getRoots()) {
-					log.debug("Cogroup input plan root: " + root);
-				}
-			}
-			isInner[i] = gi.isInner;
-		}
-		
-		LogicalOperator frj = new LOFRJoin(lp, new OperatorKey(scope, getNextId()), groupByPlans, isInner, gis.get(0).op);
-		lp.add(frj);
-		log.debug("Added operator " + frj.getClass().getName() + " object " + frj + " to the logical plan " + lp);
-		
-		for(LogicalOperator op: los) {
-			lp.connect(op, frj);
-			log.debug("Connected operator " + op.getClass().getName() + " to " + frj.getClass().getName() + " in the logical plan");
-		}
-
-		log.trace("Exiting parseFRJoin");
-		return frj;
-	}
-	*/
-			
-	/**
 	 * The join operator is translated to foreach 
 	 */
 	LogicalOperator rewriteJoin(ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
@@ -1915,13 +1869,16 @@
 	log.debug("LogicalPlan: " + lp);
 	LogicalOperator frj = null;
 	LogicalOperator skj = null;
+	LogicalOperator smj = null;
 }
 {
 	(gi = GroupItem(lp) { gis.add(gi); }
 	("," gi = GroupItem(lp) { gis.add(gi); })+
-	// The addition of using replicated to indicate FRJoin
-	([<USING> ("\"replicated\"" { frj = parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED); } | "\"repl\"" { frj=parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED);}
-    |"\"skewed\"" { skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED); })] ))
+	// For all types of join we create LOJoin and mark what type of join it is.
+	([<USING> ("\"replicated\"" { frj = parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED); }
+	| "\"repl\"" { frj=parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED);}
+    |"\"skewed\"" { skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED); }
+    |"\"merge\"" { smj = parseJoin(gis, lp, LOJoin.JOINTYPE.MERGE); })] ))
 
 	{log.trace("Exiting JoinClause");
 	if (frj!=null) {
@@ -1930,8 +1887,10 @@
 	else if (skj!=null) {
 		return skj;
 	}
+	else if (smj!=null) {
+        return smj;
+    }
 	else {
-		//return rewriteJoin(gis,lp);
 		return parseJoin(gis, lp, LOJoin.JOINTYPE.REGULAR);
 	}}
 	

Added: hadoop/pig/trunk/src/org/apache/pig/impl/util/LinkedMultiMap.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/LinkedMultiMap.java?rev=804309&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/LinkedMultiMap.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/LinkedMultiMap.java Fri Aug 14 17:53:23 2009
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.util;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+
+public class LinkedMultiMap<K,V> extends MultiMap<K,V> {
+
+    /** This class simply extends MultiMap to use LinkedHashMap instead of HashMap.
+     *  This ensures while iterating over keys of MultiMap we get keys in 
+     *  in same order as they were inserted.
+     */
+    private static final long serialVersionUID = 1L;
+
+    public LinkedMultiMap() {
+        
+        mMap = new LinkedHashMap<K, ArrayList<V>>();
+    }
+    
+    /**
+     * @param size Initial size of the map
+     */
+    public LinkedMultiMap(int size) {
+    
+        mMap = new LinkedHashMap<K, ArrayList<V>>(size);
+    }
+ }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/util/MultiMap.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/MultiMap.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/MultiMap.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/MultiMap.java Fri Aug 14 17:53:23 2009
@@ -39,9 +39,9 @@
 public class MultiMap<K, V> implements Serializable {
 
 	// Change this if you modify the class.
-	static final long serialVersionUID = 1L;
+	static final long serialVersionUID = 2L;
 
-    private HashMap<K, ArrayList<V>> mMap = null;
+    protected Map<K, ArrayList<V>> mMap = null;
 
     public MultiMap() {
         mMap = new HashMap<K, ArrayList<V>>();

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java Fri Aug 14 17:53:23 2009
@@ -900,7 +900,6 @@
     	assertTrue(count == 4);
     }
 
-
     @Test
     public void testUDFInJoin() throws Exception {
         planTester.buildPlan("a = load 'input1' using BinStorage();");
@@ -916,6 +915,19 @@
         assertTrue(mrOper.UDFs.get(0).equals("BinStorage"));
         assertTrue(mrOper.UDFs.get(1).equals("org.apache.pig.builtin.PigStorage"));
     }
+
+    @Test
+    public void testMergeJoin() throws Exception{
+
+        //generate = true;
+        planTester.buildPlan("a = load '/tmp/input1';");
+        planTester.buildPlan("b = load '/tmp/input2';");
+        planTester.buildPlan("c = join a by $0, b by $0 using \"merge\";");
+        LogicalPlan lp = planTester.buildPlan("store c into '/tmp';");
+        
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        run(pp, "test/org/apache/pig/test/data/GoldenFiles/MRC18.gld");
+    }
     
     public static class WeirdComparator extends ComparisonFunc {
 

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=804309&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Fri Aug 14 17:53:23 2009
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigException;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMergeJoin {
+
+    private static final String INPUT_FILE = "testMergeJoinInput.txt";
+    private PigServer pigServer;
+    private MiniCluster cluster = MiniCluster.buildCluster();
+
+    public TestMergeJoin() throws ExecException, IOException{
+
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+    /**
+     * @throws java.lang.Exception
+     */
+    @Before
+    public void setUp() throws Exception {
+        int LOOP_SIZE = 3;
+        String[] input = new String[LOOP_SIZE*LOOP_SIZE];
+        int k = 0;
+        for(int i = 1; i <= LOOP_SIZE; i++) {
+            String si = i + "";
+            for(int j=1;j<=LOOP_SIZE;j++)
+                input[k++] = si + "\t" + j;
+        }
+        Util.createInputFile(cluster, INPUT_FILE, input);
+    }
+
+    /**
+     * @throws java.lang.Exception
+     */
+    @After
+    public void tearDown() throws Exception {
+        Util.deleteFile(cluster, INPUT_FILE);
+    }
+
+    /**
+     * Test method for {@link org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin#getNext(org.apache.pig.data.Tuple)}.
+     */
+    @Test
+    public void testMergeJoinSimplest() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "';");
+        DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("C = join A by $0, B by $0 using \"merge\";");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbMergeJoin.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("C = join A by $0, B by $0;");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbMergeJoin.size(), dbshj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+    }
+
+    @Test
+    public void testMergeJoinOnMultiFields() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "';");
+        DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1) using \"merge\";");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbMergeJoin.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1);");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbMergeJoin.size(), dbshj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+    }
+
+    @Test
+    public void testMergeJoinWithExpr() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "';");
+        DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("C = join A by ($0+10), B by ($0+10) using \"merge\";");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbMergeJoin.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("C = join A by ($0+10), B by ($0+10);");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbMergeJoin.size(), dbshj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+    }
+
+    @Test
+    public void testMergeJoinOutWithSchema() throws IOException{
+
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+        DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("C = join A by $0, B by $0 using \"merge\";");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbMergeJoin.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("C = join A by $0, B by $0;");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbMergeJoin.size(), dbshj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+    }
+
+    @Test
+    public void testMergeJoinOutWithFilters() throws IOException{
+
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "';");
+        pigServer.registerQuery("C = FILTER A by $1 > 1;"); 
+        pigServer.registerQuery("D = FILTER B by $1 > 1;"); 
+        DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("E = join C by $0, D by $0 using \"merge\";");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+
+            while(iter.hasNext()) {
+                dbMergeJoin.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("E = join C by $0, D by $0;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbMergeJoin.size(), dbshj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+    }
+
+    @Test
+    public void testMergeJoinOutWithProjects() throws IOException{
+
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (f1,f2);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "'as (f1,f2);");
+        pigServer.registerQuery("C = foreach A generate f1;"); 
+        pigServer.registerQuery("D = foreach B generate f1;"); 
+        DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("E = join C by f1, D by f1 using \"merge\";");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+
+            while(iter.hasNext()) {
+                dbMergeJoin.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("E = join C by f1, D by f1;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbMergeJoin.size(), dbshj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+    }
+
+    @Test
+    public void testMergeJoinOutPipeline() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+        DataBag dbmrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("C = join A by $0, B by $0 using \"merge\";");
+            pigServer.registerQuery("G = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+            pigServer.registerQuery("H = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+            pigServer.registerQuery("D = join G by $0, H by $0 using \"merge\";");
+            pigServer.registerQuery("E = union C,D;");
+            pigServer.registerQuery("F = filter E by 1 == 1;");
+            Iterator<Tuple> iter = pigServer.openIterator("F");
+
+            while(iter.hasNext()) {
+                dbmrj.add(iter.next());
+            }
+        }
+        // Note that these two queries are setup little differently. This is because if there is a Split in Plan, Merge Join fails.
+        // When work in MRCompiler finishes, this query along with merge join preceded by order-by should be two test-cases for it.
+        {
+            pigServer.registerQuery("C = join A by $0, B by $0;");
+            pigServer.registerQuery("D = join A by $0, B by $0;");
+            pigServer.registerQuery("E = union C,D;");
+            pigServer.registerQuery("F = filter E by 1 == 1;");
+            Iterator<Tuple> iter = pigServer.openIterator("F");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbmrj.size(), dbshj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbmrj, dbshj));
+    }
+
+    @Test
+    public void testMergeJoinWithNulls() throws IOException{
+
+        String[] input = new String[3*3];
+        input[0] = "\t2";
+        input[1] = "1\t2";
+        input[2] = "1\t2";
+        input[3] = "\t2";
+        input[4] = "3\t2";
+        input[5] = "\t";
+        input[6] = "5\t";
+        input[7] = "7\t";
+        input[8] = "7\t1";
+        Util.createInputFile(cluster, "temp_file", input);
+        pigServer.registerQuery("A = LOAD 'temp_file';");
+        pigServer.registerQuery("B = LOAD 'temp_file';");
+        DataBag dbmrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("C = join A by $0, B by $0 using \"merge\";");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbmrj.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("C = join A by $0, B by $0;");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Util.deleteFile(cluster, "temp_file");
+        Assert.assertEquals(dbmrj.size(),dbshj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbmrj, dbshj));
+    }
+
+    @Test
+    public void testMergeJoinWithMRBoundaryLater() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+        DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("C = join A by $0, B by $0 using \"merge\";");
+            pigServer.registerQuery("D = group C by $0;");
+            pigServer.registerQuery("E = filter D by $0 > 0;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+
+            while(iter.hasNext()) {
+                dbMergeJoin.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("C = join A by $0, B by $0;");
+            pigServer.registerQuery("D = group C by $0 ;");
+            pigServer.registerQuery("E = filter D by $0 > 0;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbMergeJoin.size(), dbshj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+    }
+
+    @Test
+    public void testMergeJoin3Way() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, n);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (id, name);");
+        pigServer.registerQuery("C = LOAD '" + INPUT_FILE + "' as (id, name);");
+        try {
+            pigServer.registerQuery("D = join A by id, B by id, C by id using \"merge\";");
+        }catch(Exception e) {
+            PigException pe = LogUtils.getPigException(e);
+            Assert.assertEquals(1000,pe.getErrorCode());
+            return;
+        }
+        Assert.fail("Should throw exception, do not support 3 way join");
+    }       
+
+    @Test
+    public void testMergeJoinFailure1() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, n);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (id, name);");
+        pigServer.registerQuery("C = ORDER A by $0 parallel 5;");
+        pigServer.registerQuery("D = join A by id, C by id using \"merge\";");
+        try {
+            pigServer.openIterator("D");
+        }catch(Exception e) {
+            PigException pe = LogUtils.getPigException(e);
+            Assert.assertEquals(1103,pe.getErrorCode());
+            return;
+        }
+        Assert.fail("Should fail to compile");
+    }       
+
+    @Test
+    public void testMergeJoinFailure2() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, n);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (id, name);");
+        pigServer.registerQuery("C = GROUP B by id;");
+        pigServer.registerQuery("D = join A by id, C by $0 using \"merge\";");
+        try {
+            pigServer.openIterator("D");
+        }catch(Exception e) {
+            PigException pe = LogUtils.getPigException(e);
+            Assert.assertEquals(1103,pe.getErrorCode());
+            return;
+        }
+        Assert.fail("Should fail to compile");
+    }       
+
+    @Test
+    public void testEmptyRightFile() throws IOException{
+        DataBag dbmrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        Util.createInputFile(cluster, "temp_file", new String[]{});
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
+        pigServer.registerQuery("B = LOAD 'temp_file';");
+        pigServer.registerQuery("C = join A by $0, B by $0 using \"merge\";");
+        pigServer.openIterator("C");
+        {
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) 
+                dbmrj.add(iter.next());
+        }
+        {
+            pigServer.registerQuery("C = join A by $0, B by $0;");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbmrj.size(), dbshj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbmrj, dbshj));
+        Util.deleteFile(cluster, "temp_file");
+    }       
+
+    @Test
+    public void testMergeJoinSch1() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+        Schema mjSch = null, shjSch = null;
+        pigServer.registerQuery("C = join A by $0, B by $0 using \"merge\";");
+        mjSch = pigServer.dumpSchema("C");
+        pigServer.registerQuery("C = join A by $0, B by $0;");
+        shjSch = pigServer.dumpSchema("C");
+        Assert.assertEquals(true, shjSch.equals(mjSch));
+    }
+
+    @Test
+    public void testMergeJoinSch2() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "';");
+        Schema mjSch = null, shjSch = null;
+        pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1) using \"merge\";");
+        mjSch = pigServer.dumpSchema("C");
+        pigServer.registerQuery("C = join A by ($0,$1), B by ($0,$1);");
+        shjSch = pigServer.dumpSchema("C");
+        Assert.assertEquals(true, shjSch.equals(mjSch));
+    }
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld?rev=804309&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld Fri Aug 14 17:53:23 2009
@@ -0,0 +1,23 @@
+MapReduce(-1,PigStorage) - scope-127:
+Reduce Plan Empty
+|   Store(file:/tmp:org.apache.pig.builtin.PigStorage) - scope-126
+|   |
+|   |---MergeJoin[tuple] - scope-121
+|       |
+|       |---Load(file:/tmp/input1:org.apache.pig.builtin.PigStorage) - scope-117
+|
+|---MapReduce(-1,PigStorage) - scope-128:
+    |   Store(file:/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - scope-135
+    |   |
+    |   |---POSort[tuple]() - scope-134
+    |       |   |
+    |       |   Project[tuple][*] - scope-133
+    |       |
+    |       |---Project[tuple][1] - scope-132
+    |           |
+    |           |---Package[tuple]{chararray} - scope-131
+    |   Local Rearrange[tuple]{chararray}(false) - scope-130
+    |   |   |
+    |   |   Constant(all) - scope-129
+    |   |
+    |   |---Load(file:/tmp/input2:org.apache.pig.impl.builtin.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','kmonaaafhdhcaabdgkgbhggbcohfhegjgmcoebhchcgbhjemgjhdhehiibncbnjjmhgbjnadaaabejaaaehdgjhkgfhihaaaaaaaabhhaeaaaaaaabhdhcaaeogphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccohagmgbgohdcofagihjhdgjgdgbgmfagmgbgoaaaaaaaaaaaaaaabacaaabfkaaangfgogeepggebgmgmejgohahfhehihcaacfgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcfagmgbgoepcpjdmpihcaifknacaaagemaaakgneghcgpgnefgeghgfhdheaacdemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphfhegjgmcpenhfgmhegjengbhadlemaaafgnelgfhjhdheaaapemgkgbhggbcphfhegjgmcpengbhadlemaaahgnemgfgbhggfhdheaabaemgkgbhggbcphfhegjgmcpemgjhdhedlemaaaegnephahdhbaahoaaafemaaaggnfcgpgphehdhbaahoaaagemaaaignfegpefgeghgfhdhbaahoaaaehihahdhcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohfhegjgmcoenhfgmhegjengbhaaaaaaaaaaaaaaaacacaaabemaaaegnengbhahbaahoaaafhihahdhcaabbgkgbh
 ggbcohfhegjgmcoeigbhdgiengbhaafahnkmbmdbgganbadaaacegaaakgmgpgbgeeggbgdhegphcejaaajhegihcgfhdgigpgmgehihadpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhdhcaacegphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcelgfhjaaaaaaaaaaaaaaabacaaacekaaacgjgeemaaafhdgdgphagfheaabcemgkgbhggbcpgmgbgoghcpfdhehcgjgoghdlhihaaaaaaaaaaaaaaahiheaaafhdgdgphagfhdhcaafjgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcofaepfahcgpgkgfgdheaaaaaaaaaaaaaaabacaaagfkaaakgphggfhcgmgpgbgegfgefkaabfhahcgpgdgfhdhdgjgoghecgbghepggfehfhagmgfhdfkaabehcgfhdhfgmhefdgjgoghgmgffehfhagmgfecgbghfkaaaehdhegbhcemaaalgcgbghejhegfhcgbhegphcheaabeemgkgbhggbcphfhegjgmcpejhegfhcgbhegphcdlemaaahgdgpgmhfgngohdheaabfemgkgbhggbcphfhegjgmcpebhchcgbhjemgjhdhedlhihcaagcgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhd
 gjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcoefhihahcgfhdhdgjgpgoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaabemaaadgmgpghheaacaemgphcghcpgbhagbgdgigfcpgdgpgngngpgohdcpgmgpghghgjgoghcpemgpghdlhihcaaemgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofagihjhdgjgdgbgmephagfhcgbhegphcaaaaaaaaaaaaaaabacaaakfkaaangjgohahfheebhehegbgdgigfgeejaabehcgfhbhfgfhdhegfgefagbhcgbgmgmgfgmgjhdgnecaaakhcgfhdhfgmhefehjhagfemaaafgjgohahfheheaablemgphcghcpgbhagbgdgigfcphagjghcpgegbhegbcpfehfhagmgfdlemaaaggjgohahfhehdhbaahoaaagemaaangmgjgogfgbghgffehcgbgdgfhcheaachemgphcghcpgbhagbgdgigfcphagjghcphagfgocphfhegjgmcpemgjgogfgbghgffehcgbgdgfhcdlemaaadgmgpghhbaahoaabfemaaahgphfhehahfhehdhbaahoaaagemaaakhagbhcgfgohefagmgbgoheaafaemgphcghcpgbhagbgdgigfcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhdgjgdgbgmemgbhjgfhccphagmgbgohdcpfagihjhdgjgdgbgmfagmgbgodlemaaadhcgfhdheaaeeemgphcghcpgbhagbgdgig
 fcphagjghcpgcgbgdglgfgogecpgigbgegpgphacpgfhigfgdhfhegjgpgogfgoghgjgogfcphagihjhdgjgdgbgmemgbhjgfhccpfcgfhdhfgmhedlhihcaacbgphcghcogbhagbgdgigfcohagjghcogjgnhagmcohagmgbgocoephagfhcgbhegphcaaaaaaaaaaaaaaabacaaabemaaaegnelgfhjheaacgemgphcghcpgbhagbgdgigfcphagjghcpgjgnhagmcphagmgbgocpephagfhcgbhegphcelgfhjdlhihahbaahoaaapaappppppppdchahahahdhcaaclgphcghcogbhagbgdgigfcogdgpgngngpgohdcogmgpghghgjgoghcogjgnhagmcoemgpghdeekemgpghghgfhccikmpnoicknfncdiacaaabemaaaegogbgngfhbaahoaaaohihaheaafjgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccogfhihahcgfhdhdgjgpgoephagfhcgbhegphchdcofaepfahcgpgkgfgdhehahahdhcaaecgphcghcogbhagbgdgigfcohagjghcogcgbgdglgfgogecogigbgegpgphacogfhigfgdhfhegjgpgogfgoghgjgogfcohagihjhdgjgdgbgmemgbhjgfhccofcgfhdhfgmheaaaaaaaaaaaaaaabacaaacecaaamhcgfhehfhcgofdhegbhehfhdemaaaghcgfhdhfgmheheaabcemgkgbhggbcpgmgbgoghcpepgcgkgfgdhedlhihaachahbaahoaabpaaaaaaaahahdhbaahoaaaaaaaaaaabhhaeaaaaaaabhdhcaa
 bbgkgbhggbcogmgbgoghcoejgohegfghgfhcbcockakephibihdiacaaabejaaafhggbgmhfgfhihcaabagkgbhggbcogmgbgoghcoeohfgngcgfhcigkmjfbnaljeoailacaaaahihaaaaaaaaahihihdhbaahoaaaaaaaaaaabhhaeaaaaaaakhbaahoaabnhihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaabhbaahoaabnhbaahoaaaphihdhbaahoaaaaaaaaaaaahhaeaaaaaaakhihdhbaahoaaaihdhbaahoaaakdpeaaaaaaaaaaaamhhaiaaaaaabaaaaaaaaahiaahi')) - scope-118
\ No newline at end of file

Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java?rev=804309&r1=804308&r2=804309&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java Fri Aug 14 17:53:23 2009
@@ -198,7 +198,7 @@
     private LogicalPlan buildPlan(String query, ClassLoader cldr) {
 
         LogicalPlanBuilder.classloader = LogicalPlanTester.class.getClassLoader() ;
-        PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+        PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties());
         try {
             pigContext.connect();
         } catch (ExecException e1) {



Mime
View raw message