pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r672016 [3/4] - in /incubator/pig/branches/types: ./ src/org/apache/pig/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/parser/ src/org/apache/pig/impl/logicalLayer/validators/ src/org/apache/pig/impl/mapReduceLa...
Date Thu, 26 Jun 2008 20:05:26 GMT
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java Thu Jun 26 13:05:23 2008
@@ -119,10 +119,10 @@
         return sb.toString();
     }
     
-    private String planString(List<ExprPlan> lep){
+    private String planString(List<PhysicalPlan> lep){
         StringBuilder sb = new StringBuilder();
         if(lep!=null)
-            for (ExprPlan ep : lep) {
+            for (PhysicalPlan ep : lep) {
                 sb.append(planString(ep));
             }
         return sb.toString();
@@ -133,19 +133,15 @@
         if(node instanceof POFilter){
             sb.append(planString(((POFilter)node).getPlan()));
         }
-        else if(node instanceof POForEach){
-            sb.append(planString(((POForEach)node).getPlan()));        
-        }
-        else if(node instanceof POGenerate){
-            sb.append(planString(((POGenerate)node).getInputPlans())); 
-            
-        }
         else if(node instanceof POLocalRearrange){
             sb.append(planString(((POLocalRearrange)node).getPlans()));
         }
         else if(node instanceof POSort){
             sb.append(planString(((POSort)node).getSortPlans())); 
         }
+        else if(node instanceof POForEach){
+            sb.append(planString(((POForEach)node).getInputPlans()));
+        }
         
         List<O> predecessors = mPlan.getPredecessors(node);
         
@@ -216,26 +212,4 @@
         System.out.print(op.name() + "   ");
     }
     
-    /*public static void main(String[] args) throws PlanException, ExecException {
-        ExprPlan ep = GenPhyOp.arithPlan();
-        GreaterThanExpr gt = GenPhyOp.compGreaterThanExpr();
-        ConstantExpression ce = GenPhyOp.exprConst();
-        ce.setValue(50);
-        ep.add(ce);
-        ep.addAsLeaf(gt);
-        
-        POFilter fil = GenPhyOp.topFilterOp();
-        fil.setPlan(ep);
-        
-        PhysicalPlan php = new PhysicalPlan();
-        php.add(fil);
-        int fields[] = {0};
-        Tuple sample = TupleFactory.getInstance().newTuple();
-        sample.append("S");
-//        POForEach fe = GenPhyOp.topForEachOPWithPlan(fields, sample);
-        POForEach fe = GenPhyOp.topForEachOp();
-        php.add(fe);
-        php.connect(fil, fe);
-        php.explain(System.out);
-    }*/
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PODistinct.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PODistinct.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PODistinct.java Thu Jun 26 13:05:23 2008
@@ -41,7 +41,7 @@
  * 
  * 
  */
-public class PODistinct extends PhysicalOperator<PhyPlanVisitor> {
+public class PODistinct extends PhysicalOperator {
 
     private boolean inputsAccumulated = false;
     private DataBag distinctBag = BagFactory.getInstance().newDistinctBag();

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POFilter.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POFilter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POFilter.java Thu Jun 26 13:05:23 2008
@@ -19,18 +19,16 @@
 
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
-import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.physicalLayer.expressionOperators.ComparisonOperator;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -44,17 +42,17 @@
  * on any other data type.
  * 
  */
-public class POFilter extends PhysicalOperator<PhyPlanVisitor> {
+public class POFilter extends PhysicalOperator {
 
     /**
      * 
      */
     private static final long serialVersionUID = 1L;
 
-    private Log log = LogFactory.getLog(getClass());
+//    private Log log = LogFactory.getLog(getClass());
 
     // The expression plan
-    ExprPlan plan;
+    PhysicalPlan plan;
 
     // The root comparison operator of the expression plan
     ComparisonOperator comOp;
@@ -164,13 +162,13 @@
         v.visitFilter(this);
     }
 
-    public void setPlan(ExprPlan plan) {
+    public void setPlan(PhysicalPlan plan) {
         this.plan = plan;
         comOp = (ComparisonOperator) (plan.getLeaves()).get(0);
         compOperandType = comOp.getOperandType();
     }
 
-    public ExprPlan getPlan() {
+    public PhysicalPlan getPlan() {
         return plan;
     }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java Thu Jun 26 13:05:23 2008
@@ -1,88 +1,97 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.pig.impl.physicalLayer.relationalOperators;
 
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.DefaultBagFactory;
-import org.apache.pig.data.DefaultTuple;
-import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.Result;
 import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
-/**
- * The foreach operator 
- * It has an embedded physical plan that
- * generates tuples as per the specification.
- */
-public class POForEach extends PhysicalOperator<PhyPlanVisitor> {
+public class POForEach extends PhysicalOperator {
 
     /**
      * 
      */
     private static final long serialVersionUID = 1L;
-
-    private Log log = LogFactory.getLog(getClass());
-
-    PhysicalPlan<PhysicalOperator> plan;
-
-    POGenerate gen;
     
+    private List<Boolean> isToBeFlattened;
+    private List<PhysicalPlan> inputPlans;
+    private List<PhysicalOperator> planLeaves = new LinkedList<PhysicalOperator>();
+    private Log log = LogFactory.getLog(getClass());
     //Since the plan has a generate, this needs to be maintained
     //as the generate can potentially return multiple tuples for
     //same call.
     private boolean processingPlan = false;
-
+    
+    //its holds the iterators of the databags given by the input expressions which need flattening.
+    Iterator<Tuple> [] its = null;
+    
+    //This holds the outputs given out by the input expressions of any datatype
+    Object [] bags = null;
+    
+    //This is the template whcih contains tuples and is flattened out in CreateTuple() to generate the final output
+    Object[] data = null;
+    
     public POForEach(OperatorKey k) {
-        this(k, -1, null);
+        this(k,-1,null,null);
     }
 
-    public POForEach(OperatorKey k, int rp) {
-        this(k, rp, null);
+    public POForEach(OperatorKey k, int rp, List inp) {
+        this(k,rp,inp,null);
     }
 
-    public POForEach(OperatorKey k, List<PhysicalOperator> inp) {
-        this(k, -1, inp);
+    public POForEach(OperatorKey k, int rp) {
+        this(k,rp,null,null);
     }
 
-    public POForEach(OperatorKey k, int rp, List<PhysicalOperator> inp) {
-        super(k, rp, inp);
+    public POForEach(OperatorKey k, List inp) {
+        this(k,-1,inp,null);
+    }
+    
+    public POForEach(OperatorKey k, int rp, List<PhysicalPlan> inp, List<Boolean>  isToBeFlattened){
+        super(k, rp);
+        this.isToBeFlattened = isToBeFlattened;
+        this.inputPlans = inp;
+        getLeaves();
     }
 
     @Override
     public void visit(PhyPlanVisitor v) throws VisitorException {
-        v.visitForEach(this);
+        v.visitPOForEach(this);
     }
 
     @Override
     public String name() {
-        return "For Each" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
+        String fString = getFlatStr();
+        return "New For Each" + "(" + fString + ")" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
+    }
+    
+    private String getFlatStr() {
+        if(isToBeFlattened==null)
+            return "";
+        StringBuilder sb = new StringBuilder();
+        for (Boolean b : isToBeFlattened) {
+            sb.append(b);
+            sb.append(',');
+        }
+        if(sb.length()>0){
+            sb.deleteCharAt(sb.length()-1);
+        }
+        return sb.toString();
     }
 
     @Override
@@ -96,16 +105,6 @@
     }
     
     /**
-     * Overridden since the attachment of the new input should cause the old
-     * processing to end.
-     */
-    @Override
-    public void attachInput(Tuple t) {
-        super.attachInput(t);
-        processingPlan = false;
-    }
-    
-    /**
      * Calls getNext on the generate operator inside the nested
      * physical plan and returns it maintaining an additional state
      * to denote the begin and end of the nested plan processing.
@@ -119,7 +118,7 @@
         //returns
         if(processingPlan){
             while(true) {
-                res = gen.getNext(t);
+                res = processPlan();
                 if(res.returnStatus==POStatus.STATUS_OK){
                     return res;
                 }
@@ -144,14 +143,9 @@
             if (inp.returnStatus == POStatus.STATUS_NULL)
                 continue;
             
-            plan.attachInput((Tuple) inp.result);
-            
-            res = gen.getNext(t);
+            attachInputToPlans((Tuple) inp.result);
             
-            /*if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
-                return inp;
-            if(inp.returnStatus == POStatus.STATUS_NULL)
-                continue;*/
+            res = processPlan();
             
             processingPlan = true;
             
@@ -159,12 +153,184 @@
         }
     }
 
-    public PhysicalPlan<PhysicalOperator> getPlan() {
-        return plan;
+    private Result processPlan() throws ExecException{
+        int noItems = planLeaves.size();
+        Result res = new Result();
+        
+        //We check if all the databags have exhausted the tuples. If so we enforce the reading of new data by setting data and its to null
+        if(its != null) {
+            boolean restartIts = true;
+            for(int i = 0; i < noItems; ++i) {
+                if(its[i] != null && isToBeFlattened.get(i) == true)
+                    restartIts &= !its[i].hasNext();
+            }
+            //this means that all the databags have reached their last elements. so we need to force reading of fresh databags
+            if(restartIts) {
+                its = null;
+                data = null;
+            }
+        }
+        
+        if(its == null) {
+            //getNext being called for the first time OR starting with a set of new data from inputs 
+            its = new Iterator[noItems];
+            bags = new Object[noItems];
+            
+            for(int i = 0; i < noItems; ++i) {
+                //Getting the iterators
+                //populate the input data
+                Result inputData = null;
+                Byte resultType = ((PhysicalOperator)planLeaves.get(i)).getResultType();
+                switch(resultType) {
+                case DataType.BAG : DataBag b = null;
+                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(b);
+                break;
+                case DataType.TUPLE : Tuple t = null;
+                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(t);
+                break;
+                case DataType.BYTEARRAY : DataByteArray db = null;
+                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(db);
+                break; 
+                case DataType.MAP : Map map = null;
+                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(map);
+                break;
+                case DataType.BOOLEAN : Boolean bool = null;
+                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(bool);
+                break;
+                case DataType.INTEGER : Integer integer = null;
+                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(integer);
+                break;
+                case DataType.DOUBLE : Double d = null;
+                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(d);
+                break;
+                case DataType.LONG : Long l = null;
+                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(l);
+                break;
+                case DataType.FLOAT : Float f = null;
+                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(f);
+                break;
+                case DataType.CHARARRAY : String str = null;
+                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(str);
+                break;
+                }
+                
+                if(inputData.returnStatus == POStatus.STATUS_EOP) {
+                    //we are done with all the elements. Time to return.
+                    its = null;
+                    bags = null;
+                    return inputData;
+                }
+
+//                Object input = null;
+                
+                bags[i] = inputData.result;
+                
+                if(inputData.result instanceof DataBag && isToBeFlattened.get(i)) 
+                    its[i] = ((DataBag)bags[i]).iterator();
+                else 
+                    its[i] = null;
+            }
+        }
+        
+        Boolean done = false;
+        while(!done) {
+            if(data == null) {
+                //getNext being called for the first time or starting on new input data
+                //we instantiate the template array and start populating it with data
+                data = new Object[noItems];
+                for(int i = 0; i < noItems; ++i) {
+                    if(isToBeFlattened.get(i) && bags[i] instanceof DataBag) {
+                        if(its[i].hasNext()) {
+                            data[i] = its[i].next();
+                        } else {
+                            //the input set is null, so we return
+                            res.returnStatus = POStatus.STATUS_NULL;
+                            return res;
+                        }
+                    } else {
+                        data[i] = bags[i];
+                    }
+                    
+                }
+                if(reporter!=null) reporter.progress();
+                //CreateTuple(data);
+                res.result = CreateTuple(data);
+                res.returnStatus = POStatus.STATUS_OK;
+                return res;
+            } else {
+                //we try to find the last expression which needs flattening and start iterating over it
+                //we also try to update the template array
+                for(int index = noItems - 1; index >= 0; --index) {
+                    if(its[index] != null && isToBeFlattened.get(index)) {
+                        if(its[index].hasNext()) {
+                            data[index] =  its[index].next();
+                            res.result = CreateTuple(data);
+                            res.returnStatus = POStatus.STATUS_OK;
+                            return res;
+                        }
+                        else{
+                            its[index] = ((DataBag)bags[index]).iterator();
+                            data[index] = its[index].next();
+                        }
+                    }
+                }
+            }
+        }
+        
+        return null;
+    }
+    
+    /**
+     * 
+     * @param data array that is the template for the final flattened tuple
+     * @return the final flattened tuple
+     */
+    private Tuple CreateTuple(Object[] data) throws ExecException {
+        TupleFactory tf = TupleFactory.getInstance();
+        Tuple out = tf.newTuple();
+        for(int i = 0; i < data.length; ++i) {
+            Object in = data[i];
+            
+            if(in instanceof Tuple) {
+                Tuple t = (Tuple)in;
+                for(int j = 0; j < t.size(); ++j) {
+                    out.append(t.get(j));
+                }
+            } else
+                out.append(in);
+        }
+        return out;
+    }
+
+    
+    private void attachInputToPlans(Tuple t) {
+        //super.attachInput(t);
+        for(PhysicalPlan p : inputPlans) {
+            p.attachInput(t);
+        }
+    }
+    
+    private void getLeaves() {
+        if (inputPlans != null) {
+            for(PhysicalPlan p : inputPlans) {
+                planLeaves.add((PhysicalOperator)p.getLeaves().get(0));
+            }
+        }
+    }
+    
+    public List<PhysicalPlan> getInputPlans() {
+        return inputPlans;
+    }
+
+    public void setInputPlans(List<PhysicalPlan> plans) {
+        inputPlans = plans;
+        planLeaves.clear();
+        getLeaves();
     }
 
-    public void setPlan(PhysicalPlan<PhysicalOperator> plan) {
-        this.plan = plan;
-        gen = (POGenerate) plan.getLeaves().get(0);
+    public void setToBeFlattened(List<Boolean> flattens) {
+        isToBeFlattened = flattens;
     }
+
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POGlobalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POGlobalRearrange.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POGlobalRearrange.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POGlobalRearrange.java Thu Jun 26 13:05:23 2008
@@ -34,7 +34,7 @@
  * getNext methods have to be implemented 
  *
  */
-public class POGlobalRearrange extends PhysicalOperator<PhyPlanVisitor> {
+public class POGlobalRearrange extends PhysicalOperator {
 
     /**
      * 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLoad.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLoad.java Thu Jun 26 13:05:23 2008
@@ -45,7 +45,7 @@
  *    stores in the Map and Reduce Plans till the job is created
  *
  */
-public class POLoad extends PhysicalOperator<PhyPlanVisitor> {
+public class POLoad extends PhysicalOperator {
     /**
      * 
      */

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLocalRearrange.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLocalRearrange.java Thu Jun 26 13:05:23 2008
@@ -24,18 +24,16 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.impl.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -44,7 +42,7 @@
  * generates tuples of the form (grpKey,(indxed inp Tuple)).
  *
  */
-public class POLocalRearrange extends PhysicalOperator<PhyPlanVisitor> {
+public class POLocalRearrange extends PhysicalOperator {
 
     /**
      * 
@@ -53,7 +51,7 @@
 
     private Log log = LogFactory.getLog(getClass());
 
-    List<ExprPlan> plans;
+    List<PhysicalPlan> plans;
     
     List<ExpressionOperator> leafOps;
 
@@ -134,7 +132,7 @@
             if (inp.returnStatus == POStatus.STATUS_NULL)
                 continue;
             
-            for (ExprPlan ep : plans) {
+            for (PhysicalPlan ep : plans) {
                 ep.attachInput((Tuple)inp.result);
             }
             List<Result> resLst = new ArrayList<Result>();
@@ -216,15 +214,15 @@
         this.keyType = keyType;
     }
 
-    public List<ExprPlan> getPlans() {
+    public List<PhysicalPlan> getPlans() {
         return plans;
     }
 
-    public void setPlans(List<ExprPlan> plans) {
+    public void setPlans(List<PhysicalPlan> plans) {
         this.plans = plans;
         leafOps.clear();
-        for (ExprPlan plan : plans) {
-            leafOps.add(plan.getLeaves().get(0));
+        for (PhysicalPlan plan : plans) {
+            leafOps.add((ExpressionOperator)plan.getLeaves().get(0));
         }
     }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POPackage.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POPackage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POPackage.java Thu Jun 26 13:05:23 2008
@@ -48,7 +48,7 @@
  * be packaged into their appropriate output
  * bags based on the index.
  */
-public class POPackage extends PhysicalOperator<PhyPlanVisitor> {
+public class POPackage extends PhysicalOperator {
     /**
      * 
      */

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PORead.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PORead.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PORead.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PORead.java Thu Jun 26 13:05:23 2008
@@ -34,7 +34,7 @@
  * for testing. It'd also be useful for the example generator
  * 
  */
-public class PORead extends PhysicalOperator<PhyPlanVisitor> {
+public class PORead extends PhysicalOperator {
 
     DataBag bag;
     transient Iterator<Tuple> it;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java Thu Jun 26 13:05:23 2008
@@ -35,7 +35,7 @@
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.impl.physicalLayer.expressionOperators.POUserComparisonFunc;
@@ -54,14 +54,14 @@
  * 
  * 
  */
-public class POSort extends PhysicalOperator<PhyPlanVisitor> {
+public class POSort extends PhysicalOperator {
 
 	/**
      * 
      */
     private static final long serialVersionUID = 1L;
     //private List<Integer> mSortCols;
-	private List<ExprPlan> sortPlans;
+	private List<PhysicalPlan> sortPlans;
 	private List<Byte> ExprOutputTypes;
 	private List<Boolean> mAscCols;
 	private POUserComparisonFunc mSortFunc;
@@ -72,7 +72,7 @@
 	private DataBag sortedBag;
 	transient Iterator<Tuple> it;
 
-	public POSort(OperatorKey k, int rp, List inp, List<ExprPlan> sortPlans,
+	public POSort(OperatorKey k, int rp, List inp, List<PhysicalPlan> sortPlans,
 			List<Boolean> mAscCols, POUserFunc mSortFunc) {
 		super(k, rp, inp);
 		//this.mSortCols = mSortCols;
@@ -84,7 +84,7 @@
 					new SortComparator());
 			ExprOutputTypes = new ArrayList<Byte>(sortPlans.size());
 
-			for(ExprPlan plan : sortPlans) {
+			for(PhysicalPlan plan : sortPlans) {
 				ExprOutputTypes.add(plan.getLeaves().get(0).getResultType());
 			}
 		} else {
@@ -125,7 +125,7 @@
 			int ret = 0;
 			if(sortPlans == null || sortPlans.size() == 0) 
 				return 0;
-			for(ExprPlan plan : sortPlans) {
+			for(PhysicalPlan plan : sortPlans) {
 				try {
 					plan.attachInput(o1);
 					Result res1 = getResult(plan, ExprOutputTypes.get(count));
@@ -148,8 +148,8 @@
 			return ret;
 		} 
 		
-		private Result getResult(ExprPlan plan, byte resultType) throws ExecException {
-			ExpressionOperator Op = plan.getLeaves().get(0);
+		private Result getResult(PhysicalPlan plan, byte resultType) throws ExecException {
+			ExpressionOperator Op = (ExpressionOperator) plan.getLeaves().get(0);
 			Result res = null;
 			
 			switch (resultType) {
@@ -264,11 +264,11 @@
 		v.visitSort(this);
 	}
 
-    public List<ExprPlan> getSortPlans() {
+    public List<PhysicalPlan> getSortPlans() {
         return sortPlans;
     }
 
-    public void setSortPlans(List<ExprPlan> sortPlans) {
+    public void setSortPlans(List<PhysicalPlan> sortPlans) {
         this.sortPlans = sortPlans;
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSplit.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSplit.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSplit.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSplit.java Thu Jun 26 13:05:23 2008
@@ -64,7 +64,7 @@
  * also allows to reuse this data stored from the split
  * job whenever necessary.
  */
-public class POSplit extends PhysicalOperator<PhyPlanVisitor> {
+public class POSplit extends PhysicalOperator {
     /**
      * 
      */

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POStore.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POStore.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POStore.java Thu Jun 26 13:05:23 2008
@@ -44,7 +44,7 @@
  *    stores in the Map and Reduce Plans till the job is created
  *
  */
-public class POStore extends PhysicalOperator<PhyPlanVisitor> {
+public class POStore extends PhysicalOperator {
     /**
      * 
      */

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POUnion.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POUnion.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POUnion.java Thu Jun 26 13:05:23 2008
@@ -41,7 +41,7 @@
  * all the inputs are drained.
  *
  */
-public class POUnion extends PhysicalOperator<PhyPlanVisitor> {
+public class POUnion extends PhysicalOperator {
     /**
      * 
      */

Added: incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalkerWOSeenChk.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalkerWOSeenChk.java?rev=672016&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalkerWOSeenChk.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalkerWOSeenChk.java Thu Jun 26 13:05:23 2008
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * DependencyOrderWalker traverses the graph in such a way that no node is visited
+ * before all the nodes it depends on have been visited.  Beyond this, it does not
+ * guarantee any particular order.  So, you have a graph with node 1 2 3 4, and
+ * edges 1->3, 2->3, and 3->4, this walker guarnatees that 1 and 2 will be visited
+ * before 3 and 3 before 4, but it does not guarantee whether 1 or 2 will be
+ * visited first.
+ */
+public class DependencyOrderWalkerWOSeenChk <O extends Operator, P extends OperatorPlan<O>>
+    extends PlanWalker<O, P> {
+
+    /**
+     * @param plan Plan for this walker to traverse.
+     */
+    public DependencyOrderWalkerWOSeenChk(P plan) {
+        super(plan);
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.
+     * @throws VisitorException if an error is encountered while walking.
+     */
+    public void walk(PlanVisitor<O, P> visitor) throws VisitorException {
+        // This is highly inefficient, but our graphs are small so it should be okay.
+        // The algorithm works by starting at any node in the graph, finding it's
+        // predecessors and calling itself for each of those predecessors.  When it
+        // finds a node that has no unfinished predecessors it puts that node in the
+        // list.  It then unwinds itself putting each of the other nodes in the list.
+        // It keeps track of what nodes it's seen as it goes so it doesn't put any
+        // nodes in the graph twice.
+
+        List<O> fifo = new ArrayList<O>();
+        Set<O> seen = new HashSet<O>();
+        List<O> leaves = mPlan.getLeaves();
+        if (leaves == null) return;
+        for (O op : leaves) {
+            doAllPredecessors(op, seen, fifo);
+        }
+
+        for (O op: fifo) {
+            op.visit(visitor);
+        }
+    }
+
+    public PlanWalker<O, P> spawnChildWalker(P plan) { 
+        return new DependencyOrderWalkerWOSeenChk<O, P>(plan);
+    }
+
+    private void doAllPredecessors(O node,
+                                   Set<O> seen,
+                                   Collection<O> fifo) throws VisitorException {
+//        if (!seen.contains(node)) {
+            // We haven't seen this one before.
+            Collection<O> preds = mPlan.getPredecessors(node);
+            if (preds != null && preds.size() > 0) {
+                // Do all our predecessors before ourself
+                for (O op : preds) {
+                    doAllPredecessors(op, seen, fifo);
+                }
+            }
+            // Now do ourself
+            seen.add(node);
+            fifo.add(node);
+//        }
+    }
+}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java Thu Jun 26 13:05:23 2008
@@ -56,8 +56,11 @@
         tmpFile.delete();
         int count = 0;
         while(it.hasNext()){
+            /*
             DataByteArray a = (DataByteArray)it.next().get(2);
             int sum = Double.valueOf(a.toString()).intValue();
+            */
+            int sum = ((Double)it.next().get(2)).intValue();
             assertEquals(LOOP_COUNT/2, sum);
             count++;
         }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEqualTo.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEqualTo.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEqualTo.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEqualTo.java Thu Jun 26 13:05:23 2008
@@ -69,6 +69,7 @@
             res = g.getNext((DataBag) inp1);
             if ((Boolean) res.result == false)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((DataBag) inp1);
             ret = (DataType.compare(inp1, inp2) == 0);
@@ -83,6 +84,7 @@
             res = g.getNext((Boolean) inp1);
             if ((Boolean) res.result == false)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((Boolean) inp1);
             ret = (DataType.compare(inp1, inp2) == 0);
@@ -97,6 +99,7 @@
             res = g.getNext((DataByteArray) inp1);
             if ((Boolean) res.result == false)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((DataByteArray) inp1);
             ret = (DataType.compare(inp1, inp2) == 0);
@@ -111,6 +114,7 @@
             res = g.getNext((String) inp1);
             if ((Boolean) res.result == false)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((String) inp1);
             ret = (DataType.compare(inp1, inp2) == 0);
@@ -125,6 +129,7 @@
             res = g.getNext((Double) inp1);
             if ((Boolean) res.result == false)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((Double) inp1);
             ret = (DataType.compare(inp1, inp2) == 0);
@@ -139,6 +144,7 @@
             res = g.getNext((Float) inp1);
             if ((Boolean) res.result == false)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((Float) inp1);
             ret = (DataType.compare(inp1, inp2) == 0);
@@ -153,6 +159,7 @@
             res = g.getNext((Integer) inp1);
             if ((Boolean) res.result == false)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((Integer) inp1);
             ret = (DataType.compare(inp1, inp2) == 0);
@@ -167,6 +174,7 @@
             res = g.getNext((Long) inp1);
             if ((Boolean) res.result == false)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((Long) inp1);
             ret = (DataType.compare(inp1, inp2) == 0);
@@ -181,6 +189,7 @@
             res = g.getNext((Map) inp1);
             if ((Boolean) res.result == false)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((Map) inp1);
             ret = (DataType.compare(inp1, inp2) == 0);
@@ -195,6 +204,7 @@
             res = g.getNext((Tuple) inp1);
             if ((Boolean) res.result == false)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((Tuple) inp1);
             ret = (DataType.compare(inp1, inp2) == 0);

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Thu Jun 26 13:05:23 2008
@@ -30,6 +30,7 @@
 import java.util.Random;
 import java.util.StringTokenizer;
 
+import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.pig.EvalFunc;
@@ -51,6 +52,10 @@
 
     TupleFactory mTf = TupleFactory.getInstance();
     
+    @Before
+    public void setUp(){
+        FileLocalizer.setR(new Random());
+    }
     
     static public class MyBagFunction extends EvalFunc<DataBag>{
         @Override
@@ -83,7 +88,7 @@
         File f1 = createFile(new String[]{"a:1","b:1","a:1"});
 
         pigServer.registerQuery("a = load 'file:" + f1 + "' using " + PigStorage.class.getName() + "(':');");
-        pigServer.registerQuery("b = foreach a generate '1'-'1'/'1';");
+        pigServer.registerQuery("b = foreach a generate 1-1/1;");
         Iterator<Tuple> iter  = pigServer.openIterator("b");
         
         for (int i=0 ;i<3; i++){
@@ -122,8 +127,8 @@
         pw.println("a");
         pw.println("a");
         pw.close();
-        pigServer.registerQuery("a = foreach (load 'file:" + f + "') generate '1', flatten(" + MyBagFunction.class.getName() + "(*));");
-        pigServer.registerQuery("b = foreach a generate $0, flatten($1);");
+        pigServer.registerQuery("a = foreach (load 'file:" + f + "') generate 1, flatten(" + MyBagFunction.class.getName() + "(*));");
+//        pigServer.registerQuery("b = foreach a generate $0, flatten($1);");
         Iterator<Tuple> iter = pigServer.openIterator("a");
         int count = 0;
         while(iter.hasNext()){
@@ -316,7 +321,7 @@
         while (iter.hasNext()){
             Tuple t = iter.next();
             if (eliminateDuplicates){
-System.out.println("HERE " + DataType.findType(t.get(0)));
+                System.out.println("HERE " + DataType.findType(t.get(0)));
                 assertTrue(last < Integer.valueOf(t.get(0).toString()));
             }else{
                 assertTrue(last <= DataType.toDouble(t.get(0)));

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java Thu Jun 26 13:05:23 2008
@@ -56,7 +56,7 @@
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.relationalOperators.*;
 import org.apache.pig.impl.physicalLayer.expressionOperators.ConstantExpression;
@@ -78,7 +78,7 @@
     static PigContext pc;
     String ldFile;
     String expFile;
-    PhysicalPlan<PhysicalOperator> php = new PhysicalPlan<PhysicalOperator>();
+    PhysicalPlan php = new PhysicalPlan();
     String stFile;
     String hadoopLdFile;
     String grpName;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java Thu Jun 26 13:05:23 2008
@@ -17,55 +17,34 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.assertEquals;
-
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.executionengine.ExecutionEngine;
-import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.DefaultBagFactory;
 import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.impl.mapReduceLayer.LocalLauncher;
-import org.apache.pig.impl.mapReduceLayer.MRCompiler;
-import org.apache.pig.impl.mapReduceLayer.MapReduceLauncher;
-import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
-import org.apache.pig.impl.physicalLayer.POStatus;
-import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
-import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.impl.physicalLayer.relationalOperators.*;
-import org.apache.pig.impl.physicalLayer.expressionOperators.ConstantExpression;
 import org.apache.pig.impl.physicalLayer.expressionOperators.POProject;
-import org.apache.pig.impl.physicalLayer.expressionOperators.ComparisonOperator;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.test.utils.GenPhyOp;
 import org.apache.pig.test.utils.GenRandomData;
 import org.apache.pig.test.utils.TestHelper;
@@ -77,7 +56,7 @@
     static PigContext pc;
     String ldFile;
     String expFile;
-    PhysicalPlan<PhysicalOperator> php = new PhysicalPlan<PhysicalOperator>();
+    PhysicalPlan php = new PhysicalPlan();
     String stFile;
     String grpName;
     String curDir;
@@ -163,7 +142,7 @@
         str.store();
     }
     
-    private void setUp1(boolean gen) throws Exception {
+    /*private void setUp1(boolean gen) throws Exception {
         
         ldFile = "file:" + inpDir + "jsTst1.txt";
         expFile = ldFile;
@@ -377,7 +356,7 @@
         
         assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
         
-    }
+    }*/
     
     private void setUp5(boolean gen) throws Exception {
         ldFile = "file:" + inpDir + "jsTst5.txt";

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java Thu Jun 26 13:05:23 2008
@@ -17,9 +17,6 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.*;
-
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -31,13 +28,12 @@
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
-import org.apache.pig.impl.physicalLayer.relationalOperators.POLocalRearrange;
-import org.apache.pig.impl.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.impl.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.test.utils.GenPhyOp;
 import org.apache.pig.test.utils.GenRandomData;
@@ -107,9 +103,9 @@
     
     private void setUp2() throws PlanException, ExecException{
         lr = GenPhyOp.topLocalRearrangeOPWithPlanPlain(0,0,db.iterator().next());
-        List<ExprPlan> plans = lr.getPlans();
+        List<PhysicalPlan> plans = lr.getPlans();
         POLocalRearrange lrT = GenPhyOp.topLocalRearrangeOPWithPlanPlain(0, 1, db.iterator().next());
-        List<ExprPlan> plansT = lrT.getPlans();
+        List<PhysicalPlan> plansT = lrT.getPlans();
         plans.add(plansT.get(0));
         lr.setPlans(plans);
         

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java Thu Jun 26 13:05:23 2008
@@ -66,6 +66,8 @@
     
     Random r = new Random();
     PigContext pc = new PigContext(ExecType.LOCAL);
+
+    private boolean generate = false;
     
     
     private void writeData(File input, int noTuples, int arityOfTuples, char separator) throws IOException {
@@ -102,6 +104,13 @@
     	ByteArrayOutputStream baos = new ByteArrayOutputStream();
         pp.explain(baos);
         String compiledPlan = baos.toString();
+        
+        if(generate){
+            FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/ComplexForeach.gld");
+            fos.write(baos.toByteArray());
+            return;
+        }
+        
     	FileInputStream fis = new FileInputStream("test/org/apache/pig/test/data/GoldenFiles/ComplexForeach.gld");
         byte[] b = new byte[MAX_SIZE];
         int len = fis.read(b);
@@ -123,6 +132,13 @@
     	ByteArrayOutputStream baos = new ByteArrayOutputStream();
         pp.explain(baos);
         String compiledPlan = baos.toString();
+        
+        if(generate){
+            FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Sort.gld");
+            fos.write(baos.toByteArray());
+            return;
+        }
+        
     	FileInputStream fis = new FileInputStream("test/org/apache/pig/test/data/GoldenFiles/Sort.gld");
         byte[] b = new byte[MAX_SIZE];
         int len = fis.read(b);
@@ -146,6 +162,13 @@
     	ByteArrayOutputStream baos = new ByteArrayOutputStream();
         pp.explain(baos);
         String compiledPlan = baos.toString();
+        
+        if(generate){
+            FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Distinct.gld");
+            fos.write(baos.toByteArray());
+            return;
+        }
+        
     	FileInputStream fis = new FileInputStream("test/org/apache/pig/test/data/GoldenFiles/Distinct.gld");
         byte[] b = new byte[MAX_SIZE];
         int len = fis.read(b);
@@ -169,6 +192,13 @@
     	ByteArrayOutputStream baos = new ByteArrayOutputStream();
         pp.explain(baos);
         String compiledPlan = baos.toString();
+        
+        if(generate){
+            FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Cogroup.gld");
+            fos.write(baos.toByteArray());
+            return;
+        }
+        
     	FileInputStream fis = new FileInputStream("test/org/apache/pig/test/data/GoldenFiles/Cogroup.gld");
         byte[] b = new byte[MAX_SIZE];
         int len = fis.read(b);
@@ -197,6 +227,13 @@
     	ByteArrayOutputStream baos = new ByteArrayOutputStream();
         pp.explain(baos);
         String compiledPlan = baos.toString();
+        
+        if(generate){
+            FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Arithmetic.gld");
+            fos.write(baos.toByteArray());
+            return;
+        }
+        
     	FileInputStream fis = new FileInputStream("test/org/apache/pig/test/data/GoldenFiles/Arithmetic.gld");
         byte[] b = new byte[MAX_SIZE];
         int len = fis.read(b);
@@ -222,6 +259,13 @@
     	ByteArrayOutputStream baos = new ByteArrayOutputStream();
         pp.explain(baos);
         String compiledPlan = baos.toString();
+        
+        if(generate){
+            FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Comparison.gld");
+            fos.write(baos.toByteArray());
+            return;
+        }
+        
     	FileInputStream fis = new FileInputStream("test/org/apache/pig/test/data/GoldenFiles/Comparison.gld");
         byte[] b = new byte[MAX_SIZE];
         int len = fis.read(b);
@@ -247,6 +291,13 @@
     	ByteArrayOutputStream baos = new ByteArrayOutputStream();
         pp.explain(baos);
         String compiledPlan = baos.toString();
+        
+        if(generate){
+            FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/BinCond.gld");
+            fos.write(baos.toByteArray());
+            return;
+        }
+        
     	FileInputStream fis = new FileInputStream("test/org/apache/pig/test/data/GoldenFiles/BinCond.gld");
         byte[] b = new byte[MAX_SIZE];
         int len = fis.read(b);
@@ -273,6 +324,13 @@
     	ByteArrayOutputStream baos = new ByteArrayOutputStream();
         pp.explain(baos);
         String compiledPlan = baos.toString();
+        
+        if(generate){
+            FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Generate.gld");
+            fos.write(baos.toByteArray());
+            return;
+        }
+        
     	FileInputStream fis = new FileInputStream("test/org/apache/pig/test/data/GoldenFiles/Generate.gld");
         byte[] b = new byte[MAX_SIZE];
         int len = fis.read(b);
@@ -296,6 +354,13 @@
     	ByteArrayOutputStream baos = new ByteArrayOutputStream();
         pp.explain(baos);
         String compiledPlan = baos.toString();
+        
+        if(generate){
+            FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Union.gld");
+            fos.write(baos.toByteArray());
+            return;
+        }
+        
     	FileInputStream fis = new FileInputStream("test/org/apache/pig/test/data/GoldenFiles/Union.gld");
         byte[] b = new byte[MAX_SIZE];
         int len = fis.read(b);
@@ -320,6 +385,13 @@
     	ByteArrayOutputStream baos = new ByteArrayOutputStream();
         pp.explain(baos);
         String compiledPlan = baos.toString();
+        
+        if(generate){
+            FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/Split1.gld");
+            fos.write(baos.toByteArray());
+            return;
+        }
+        
     	FileInputStream fis1 = new FileInputStream("test/org/apache/pig/test/data/GoldenFiles/Split1.gld");
     	FileInputStream fis2 = new FileInputStream("test/org/apache/pig/test/data/GoldenFiles/Split2.gld");
         byte[] b1 = new byte[MAX_SIZE];
@@ -358,7 +430,14 @@
     	ByteArrayOutputStream baos = new ByteArrayOutputStream();
         pp.explain(baos);
         String compiledPlan = baos.toString();
-        compiledPlan += "\n"; //for the string compare, the files contain an additional \n
+        
+        if(generate){
+            FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/IsNull1.gld");
+            fos.write(baos.toByteArray());
+            return;
+        }
+        
+//        compiledPlan += "\n"; //for the string compare, the files contain an additional \n
     	FileInputStream fis1 = new FileInputStream("test/org/apache/pig/test/data/GoldenFiles/IsNull1.gld");
     	FileInputStream fis2 = new FileInputStream("test/org/apache/pig/test/data/GoldenFiles/IsNull2.gld");
         byte[] b1 = new byte[MAX_SIZE];
@@ -399,7 +478,7 @@
         LogicalPlan lp = buildPlan(query);
         PhysicalPlan pp = buildPhysicalPlan(lp);
         
-        DependencyOrderWalker<PhysicalOperator, PhysicalPlan<PhysicalOperator>> walker = new DependencyOrderWalker<PhysicalOperator, PhysicalPlan<PhysicalOperator>>(pp);
+        DependencyOrderWalker<PhysicalOperator, PhysicalPlan> walker = new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(pp);
     	PhyPlanPrinterVisitor visitor = new PhyPlanPrinterVisitor(pp, walker);
     	visitor.visit();
     	System.out.println(visitor.output);

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Thu Jun 26 13:05:23 2008
@@ -290,7 +290,7 @@
         //a split statement. Currently it breaks as adding an
         //additional output to the filter fails as filter supports
         //single output
-        //"C = FILTER A BY $2 == $3;" +
+        "C = FILTER A BY $2 == $3;" +
         "B = ARRANGE B BY $1;" +
         "GENERATE A, FLATTEN(B.$0);" +
         "};";

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java Thu Jun 26 13:05:23 2008
@@ -40,7 +40,7 @@
 import org.apache.pig.impl.mapReduceLayer.MRCompiler;
 import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.plans.PlanPrinter;
 import org.apache.pig.impl.physicalLayer.relationalOperators.*;
@@ -56,7 +56,7 @@
 import org.junit.Test;
 
 public class TestMRCompiler extends junit.framework.TestCase {
-    static PhysicalPlan<PhysicalOperator> php = new PhysicalPlan<PhysicalOperator>();
+    static PhysicalPlan php = new PhysicalPlan();
 
 //    MiniCluster cluster = MiniCluster.buildCluster();
     
@@ -116,9 +116,9 @@
 
     public static void intTestRun1() throws ExecException, PlanException, VisitorException,
             ExecException {
-        php = new PhysicalPlan<PhysicalOperator>();
+        php = new PhysicalPlan();
 
-        PhysicalPlan<PhysicalOperator> part1 = new PhysicalPlan<PhysicalOperator>();
+        PhysicalPlan part1 = new PhysicalPlan();
         POLoad lC = GenPhyOp.topLoadOp();
         POFilter fC = GenPhyOp.topFilterOp();
         fC.setRequestedParallelism(20);
@@ -225,9 +225,9 @@
 
     public static void intTestRun2() throws ExecException, PlanException, VisitorException,
             ExecException {
-        php = new PhysicalPlan<PhysicalOperator>();
+        php = new PhysicalPlan();
 
-        PhysicalPlan<PhysicalOperator> part1 = new PhysicalPlan<PhysicalOperator>();
+        PhysicalPlan part1 = new PhysicalPlan();
         POLoad lC = GenPhyOp.topLoadOp();
         POFilter fC = GenPhyOp.topFilterOp();
         POLocalRearrange lrC = GenPhyOp.topLocalRearrangeOp();
@@ -316,7 +316,7 @@
 
     public static void intTestSpl1() throws ExecException, VisitorException, PlanException,
             ExecException, IOException {
-        php = new PhysicalPlan<PhysicalOperator>();
+        php = new PhysicalPlan();
 
         POLoad lA = GenPhyOp.topLoadOp();
         POSplit spl = GenPhyOp.topSplitOp();
@@ -355,7 +355,7 @@
 
     public static void intTestSpl2() throws ExecException, VisitorException, PlanException,
             ExecException, IOException {
-        php = new PhysicalPlan<PhysicalOperator>();
+        php = new PhysicalPlan();
 
         POLoad lA = GenPhyOp.topLoadOp();
         POSplit spl = GenPhyOp.topSplitOp();
@@ -403,7 +403,7 @@
 
     public static void intTestSpl3() throws ExecException, VisitorException, PlanException,
             ExecException, IOException {
-        php = new PhysicalPlan<PhysicalOperator>();
+        php = new PhysicalPlan();
 
         POLoad lA = GenPhyOp.topLoadOp();
         POSplit spl = GenPhyOp.topSplitOp();
@@ -487,17 +487,17 @@
      * @throws ExecException
      */
     public static void intTestSim1() throws PlanException, ExecException {
-        php = new PhysicalPlan<PhysicalOperator>();
+        php = new PhysicalPlan();
         POLoad ld = GenPhyOp.topLoadOp();
         php.add(ld);
-        PhysicalPlan<PhysicalOperator> grpChain1 = GenPhyOp.grpChain();
+        PhysicalPlan grpChain1 = GenPhyOp.grpChain();
         php.merge(grpChain1);
 
         php.connect(ld, grpChain1.getRoots().get(0));
 
         PhysicalOperator leaf = php.getLeaves().get(0);
 
-        PhysicalPlan<PhysicalOperator> grpChain2 = GenPhyOp.grpChain();
+        PhysicalPlan grpChain2 = GenPhyOp.grpChain();
         php.merge(grpChain2);
 
         php.connect(leaf, grpChain2.getRoots().get(0));
@@ -516,10 +516,10 @@
     }
 
     public static void intTestSim2() throws ExecException, PlanException {
-        php = new PhysicalPlan<PhysicalOperator>();
+        php = new PhysicalPlan();
 
-        PhysicalPlan<PhysicalOperator> ldGrpChain1 = GenPhyOp.loadedGrpChain();
-        PhysicalPlan<PhysicalOperator> ldGrpChain2 = GenPhyOp.loadedGrpChain();
+        PhysicalPlan ldGrpChain1 = GenPhyOp.loadedGrpChain();
+        PhysicalPlan ldGrpChain2 = GenPhyOp.loadedGrpChain();
 
         php.merge(ldGrpChain1);
         php.merge(ldGrpChain2);
@@ -535,10 +535,10 @@
     }
 
     public static void intTestSim3() throws ExecException, PlanException {
-        php = new PhysicalPlan<PhysicalOperator>();
+        php = new PhysicalPlan();
 
-        PhysicalPlan<PhysicalOperator> ldGrpChain1 = GenPhyOp.loadedGrpChain();
-        PhysicalPlan<PhysicalOperator> ldGrpChain2 = GenPhyOp.loadedGrpChain();
+        PhysicalPlan ldGrpChain1 = GenPhyOp.loadedGrpChain();
+        PhysicalPlan ldGrpChain2 = GenPhyOp.loadedGrpChain();
 
         php.merge(ldGrpChain1);
         php.merge(ldGrpChain2);
@@ -546,7 +546,7 @@
         POUnion un = GenPhyOp.topUnionOp();
         php.addAsLeaf(un);
 
-        PhysicalPlan<PhysicalOperator> ldFil1 = GenPhyOp.loadedFilter();
+        PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
 
         php.merge(ldFil1);
         php.connect(ldFil1.getLeaves().get(0), un);
@@ -558,10 +558,10 @@
     }
 
     public static void intTestSim4() throws ExecException, PlanException {
-        php = new PhysicalPlan<PhysicalOperator>();
+        php = new PhysicalPlan();
 
-        PhysicalPlan<PhysicalOperator> ldGrpChain1 = GenPhyOp.loadedGrpChain();
-        PhysicalPlan<PhysicalOperator> ldGrpChain2 = GenPhyOp.loadedGrpChain();
+        PhysicalPlan ldGrpChain1 = GenPhyOp.loadedGrpChain();
+        PhysicalPlan ldGrpChain2 = GenPhyOp.loadedGrpChain();
 
         php.merge(ldGrpChain1);
         php.merge(ldGrpChain2);
@@ -569,8 +569,8 @@
         POUnion un = GenPhyOp.topUnionOp();
         php.addAsLeaf(un);
 
-        PhysicalPlan<PhysicalOperator> ldFil1 = GenPhyOp.loadedFilter();
-        PhysicalPlan<PhysicalOperator> ldFil2 = GenPhyOp.loadedFilter();
+        PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
+        PhysicalPlan ldFil2 = GenPhyOp.loadedFilter();
 
         php.merge(ldFil1);
         php.connect(ldFil1.getLeaves().get(0), un);
@@ -585,9 +585,9 @@
     }
 
     public static void intTestSim5() throws ExecException, PlanException {
-        php = new PhysicalPlan<PhysicalOperator>();
-        PhysicalPlan<PhysicalOperator> ldFil1 = GenPhyOp.loadedFilter();
-        PhysicalPlan<PhysicalOperator> ldFil2 = GenPhyOp.loadedFilter();
+        php = new PhysicalPlan();
+        PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
+        PhysicalPlan ldFil2 = GenPhyOp.loadedFilter();
         php.merge(ldFil1);
         php.merge(ldFil2);
 
@@ -601,10 +601,10 @@
     }
 
     public static void intTestSim6() throws ExecException, PlanException {
-        php = new PhysicalPlan<PhysicalOperator>();
+        php = new PhysicalPlan();
 
-        PhysicalPlan<PhysicalOperator> ldGrpChain1 = GenPhyOp.loadedGrpChain();
-        PhysicalPlan<PhysicalOperator> ldGrpChain2 = GenPhyOp.loadedGrpChain();
+        PhysicalPlan ldGrpChain1 = GenPhyOp.loadedGrpChain();
+        PhysicalPlan ldGrpChain2 = GenPhyOp.loadedGrpChain();
 
         POLocalRearrange lr1 = GenPhyOp.topLocalRearrangeOp();
         POLocalRearrange lr2 = GenPhyOp.topLocalRearrangeOp();
@@ -627,10 +627,10 @@
     }
 
     public static void intTestSim7() throws ExecException, PlanException {
-        php = new PhysicalPlan<PhysicalOperator>();
+        php = new PhysicalPlan();
 
-        PhysicalPlan<PhysicalOperator> ldGrpChain1 = GenPhyOp.loadedGrpChain();
-        PhysicalPlan<PhysicalOperator> ldGrpChain2 = GenPhyOp.loadedGrpChain();
+        PhysicalPlan ldGrpChain1 = GenPhyOp.loadedGrpChain();
+        PhysicalPlan ldGrpChain2 = GenPhyOp.loadedGrpChain();
 
         POLocalRearrange lr1 = GenPhyOp.topLocalRearrangeOp();
         POLocalRearrange lr2 = GenPhyOp.topLocalRearrangeOp();
@@ -644,7 +644,7 @@
         POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
         php.addAsLeaf(gr);
 
-        PhysicalPlan<PhysicalOperator> ldFil1 = GenPhyOp.loadedFilter();
+        PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
 
         php.merge(ldFil1);
         php.connect(ldFil1.getLeaves().get(0), gr);
@@ -657,10 +657,10 @@
     }
 
     public static void intTestSim8() throws ExecException, PlanException {
-        php = new PhysicalPlan<PhysicalOperator>();
+        php = new PhysicalPlan();
 
-        PhysicalPlan<PhysicalOperator> ldGrpChain1 = GenPhyOp.loadedGrpChain();
-        PhysicalPlan<PhysicalOperator> ldGrpChain2 = GenPhyOp.loadedGrpChain();
+        PhysicalPlan ldGrpChain1 = GenPhyOp.loadedGrpChain();
+        PhysicalPlan ldGrpChain2 = GenPhyOp.loadedGrpChain();
 
         POLocalRearrange lr1 = GenPhyOp.topLocalRearrangeOp();
         POLocalRearrange lr2 = GenPhyOp.topLocalRearrangeOp();
@@ -674,8 +674,8 @@
         POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
         php.addAsLeaf(gr);
 
-        PhysicalPlan<PhysicalOperator> ldFil1 = GenPhyOp.loadedFilter();
-        PhysicalPlan<PhysicalOperator> ldFil2 = GenPhyOp.loadedFilter();
+        PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
+        PhysicalPlan ldFil2 = GenPhyOp.loadedFilter();
 
         php.merge(ldFil1);
         php.connect(ldFil1.getLeaves().get(0), gr);
@@ -691,13 +691,13 @@
     }
 
     public static void intTestSim9() throws ExecException, PlanException {
-        php = new PhysicalPlan<PhysicalOperator>();
+        php = new PhysicalPlan();
 
         POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
         php.addAsLeaf(gr);
 
-        PhysicalPlan<PhysicalOperator> ldFil1 = GenPhyOp.loadedFilter();
-        PhysicalPlan<PhysicalOperator> ldFil2 = GenPhyOp.loadedFilter();
+        PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
+        PhysicalPlan ldFil2 = GenPhyOp.loadedFilter();
 
         php.merge(ldFil1);
         php.connect(ldFil1.getLeaves().get(0), gr);
@@ -713,8 +713,8 @@
     }
     
     public static void intTestSortUDF1() throws PlanException, ExecException{
-        php = new PhysicalPlan<PhysicalOperator>();
-        PhysicalPlan<PhysicalOperator> ldFil1 = GenPhyOp.loadedFilter();
+        php = new PhysicalPlan();
+        PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
         php.merge(ldFil1);
         
         String funcName = WeirdComparator.class.getName();
@@ -723,7 +723,7 @@
         POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, ldFil1.getLeaves(),
                 null, null, comparator);
         sort.setRequestedParallelism(20);
-        ExprPlan nesSortPlan = new ExprPlan();
+        PhysicalPlan nesSortPlan = new PhysicalPlan();
         POProject topPrj = new POProject(new OperatorKey("", r.nextLong()));
         topPrj.setColumn(1);
         topPrj.setOverloaded(true);
@@ -736,7 +736,7 @@
         nesSortPlan.add(prjStar2);
         
         nesSortPlan.connect(topPrj, prjStar2);
-        List<ExprPlan> nesSortPlanLst = new ArrayList<ExprPlan>();
+        List<PhysicalPlan> nesSortPlanLst = new ArrayList<PhysicalPlan>();
         nesSortPlanLst.add(nesSortPlan);
         
         sort.setSortPlans(nesSortPlanLst);
@@ -751,7 +751,7 @@
         php.add(fe3);
         php.connect(sort, fe3);
         
-        PhysicalPlan<PhysicalOperator> grpChain1 = GenPhyOp.grpChain();
+        PhysicalPlan grpChain1 = GenPhyOp.grpChain();
         php.merge(grpChain1);
         php.connect(fe3,grpChain1.getRoots().get(0));
         
@@ -760,7 +760,7 @@
         POForEach fe4 = GenPhyOp.topForEachOPWithUDF(udfs);
         php.addAsLeaf(fe4);
         
-        PhysicalPlan<PhysicalOperator> grpChain2 = GenPhyOp.grpChain();
+        PhysicalPlan grpChain2 = GenPhyOp.grpChain();
         php.merge(grpChain2);
         php.connect(fe4,grpChain2.getRoots().get(0));
 
@@ -774,8 +774,8 @@
     }
     
     public static void intTestDistinct1() throws PlanException, ExecException{
-        php = new PhysicalPlan<PhysicalOperator>();
-        PhysicalPlan<PhysicalOperator> ldFil1 = GenPhyOp.loadedFilter();
+        php = new PhysicalPlan();
+        PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
         php.merge(ldFil1);
         
         PODistinct op = new PODistinct(new OperatorKey("", r.nextLong()),
@@ -783,7 +783,7 @@
         
         php.addAsLeaf(op);
         
-        PhysicalPlan<PhysicalOperator> grpChain1 = GenPhyOp.grpChain();
+        PhysicalPlan grpChain1 = GenPhyOp.grpChain();
         php.merge(grpChain1);
         php.connect(op,grpChain1.getRoots().get(0));
         

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestNotEqualTo.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestNotEqualTo.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestNotEqualTo.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestNotEqualTo.java Thu Jun 26 13:05:23 2008
@@ -69,6 +69,7 @@
             res = g.getNext((DataBag) inp1);
             if ((Boolean) res.result == true)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((DataBag) inp1);
             ret = (DataType.compare(inp1, inp2) != 0);
@@ -83,6 +84,7 @@
             res = g.getNext((Boolean) inp1);
             if ((Boolean) res.result == true)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((Boolean) inp1);
             ret = (DataType.compare(inp1, inp2) != 0);
@@ -97,6 +99,7 @@
             res = g.getNext((DataByteArray) inp1);
             if ((Boolean) res.result == true)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((DataByteArray) inp1);
             ret = (DataType.compare(inp1, inp2) != 0);
@@ -111,6 +114,7 @@
             res = g.getNext((String) inp1);
             if ((Boolean) res.result == true)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((String) inp1);
             ret = (DataType.compare(inp1, inp2) != 0);
@@ -125,6 +129,7 @@
             res = g.getNext((Double) inp1);
             if ((Boolean) res.result == true)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((Double) inp1);
             ret = (DataType.compare(inp1, inp2) != 0);
@@ -139,6 +144,7 @@
             res = g.getNext((Float) inp1);
             if ((Boolean) res.result == true)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((Float) inp1);
             ret = (DataType.compare(inp1, inp2) != 0);
@@ -153,6 +159,7 @@
             res = g.getNext((Integer) inp1);
             if ((Boolean) res.result == true)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((Integer) inp1);
             ret = (DataType.compare(inp1, inp2) != 0);
@@ -167,6 +174,7 @@
             res = g.getNext((Long) inp1);
             if ((Boolean) res.result == true)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((Long) inp1);
             ret = (DataType.compare(inp1, inp2) != 0);
@@ -181,6 +189,7 @@
             res = g.getNext((Map) inp1);
             if ((Boolean) res.result == true)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((Map) inp1);
             ret = (DataType.compare(inp1, inp2) != 0);
@@ -195,6 +204,7 @@
             res = g.getNext((Tuple) inp1);
             if ((Boolean) res.result == true)
                 return false;
+            lt.setValue(inp1);
             rt.setValue(inp2);
             res = g.getNext((Tuple) inp1);
             ret = (DataType.compare(inp1, inp2) != 0);

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestNull.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestNull.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestNull.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestNull.java Thu Jun 26 13:05:23 2008
@@ -27,6 +27,7 @@
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.physicalLayer.Result;
 import org.apache.pig.impl.physicalLayer.expressionOperators.ConstantExpression;
 import org.apache.pig.impl.physicalLayer.expressionOperators.POIsNull;
@@ -50,6 +51,8 @@
         Random r = new Random();
         ConstantExpression lt = (ConstantExpression) GenPhyOp.exprConst();
         lt.setResultType(type);
+        Tuple dummyTuple = TupleFactory.getInstance().newTuple(1);
+        lt.attachInput(dummyTuple);
         POIsNull isNullExpr = (POIsNull) GenPhyOp.compIsNullExpr();
         isNullExpr.setExpr(lt);
 

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java Thu Jun 26 13:05:23 2008
@@ -27,7 +27,7 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.expressionOperators.ConstantExpression;
 import org.apache.pig.impl.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.impl.physicalLayer.expressionOperators.POBinCond;
@@ -80,7 +80,7 @@
         POBinCond op = new POBinCond(new OperatorKey("", r.nextLong()), -1, equal, prjLhs, prjRhs);
         op.setResultType(DataType.INTEGER);
         
-        ExprPlan plan = new ExprPlan();
+        PhysicalPlan plan = new PhysicalPlan();
         plan.add(op);
         plan.add(prjLhs);
         plan.add(prjRhs);

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java Thu Jun 26 13:05:23 2008
@@ -36,7 +36,7 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.expressionOperators.POCast;
 import org.apache.pig.impl.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.impl.plan.PlanException;
@@ -62,7 +62,7 @@
 		
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
 		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		ExprPlan plan = new ExprPlan();
+		PhysicalPlan plan = new PhysicalPlan();
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
@@ -171,7 +171,7 @@
 		
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
 		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		ExprPlan plan = new ExprPlan();
+		PhysicalPlan plan = new PhysicalPlan();
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
@@ -280,7 +280,7 @@
 		
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
 		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		ExprPlan plan = new ExprPlan();
+		PhysicalPlan plan = new PhysicalPlan();
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
@@ -389,7 +389,7 @@
 		
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
 		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		ExprPlan plan = new ExprPlan();
+		PhysicalPlan plan = new PhysicalPlan();
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
@@ -490,7 +490,7 @@
 	public void testStringToOther() throws PlanException, ExecException {
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
 		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		ExprPlan plan = new ExprPlan();
+		PhysicalPlan plan = new PhysicalPlan();
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
@@ -664,7 +664,7 @@
 		LoadFunc load = new TestLoader();
 		op.setLoadFSpec(load.getClass().getName());
 		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		ExprPlan plan = new ExprPlan();
+		PhysicalPlan plan = new PhysicalPlan();
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
@@ -783,7 +783,7 @@
 	public void testTupleToOther() throws PlanException, ExecException {
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
 		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		ExprPlan plan = new ExprPlan();
+		PhysicalPlan plan = new PhysicalPlan();
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
@@ -896,7 +896,7 @@
 	public void testBagToOther() throws PlanException, ExecException {
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
 		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		ExprPlan plan = new ExprPlan();
+		PhysicalPlan plan = new PhysicalPlan();
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);
@@ -991,7 +991,7 @@
 	public void testMapToOther() throws PlanException, ExecException {
 		POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);
 		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
-		ExprPlan plan = new ExprPlan();
+		PhysicalPlan plan = new PhysicalPlan();
 		plan.add(prj);
 		plan.add(op);
 		plan.connect(prj, op);

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java Thu Jun 26 13:05:23 2008
@@ -49,8 +49,8 @@
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
-import org.apache.pig.impl.physicalLayer.relationalOperators.POGenerate;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.impl.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.impl.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.test.utils.GenRandomData;
@@ -158,14 +158,14 @@
         List<Boolean> toBeFlattened = new LinkedList<Boolean>();
         toBeFlattened.add(true);
         toBeFlattened.add(true);
-        ExprPlan plan1 = new ExprPlan();
+        PhysicalPlan plan1 = new PhysicalPlan();
         plan1.add(prj1);
-        ExprPlan plan2 = new ExprPlan();
+        PhysicalPlan plan2 = new PhysicalPlan();
         plan2.add(prj2);
-        List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+        List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
         inputs.add(plan1); 
         inputs.add(plan2);
-        PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
+        PhysicalOperator poGen = new POForEach(new OperatorKey("", r.nextLong()), 1, inputs, toBeFlattened);
         //DataBag obtained = bf.newDefaultBag();
         for(Iterator<Tuple> it = cogroup.iterator(); it.hasNext(); ) {
             Tuple t = it.next();
@@ -192,14 +192,14 @@
         List<Boolean> toBeFlattened = new LinkedList<Boolean>();
         toBeFlattened.add(true);
         toBeFlattened.add(false);
-        ExprPlan plan1 = new ExprPlan();
+        PhysicalPlan plan1 = new PhysicalPlan();
         plan1.add(prj1);
-        ExprPlan plan2 = new ExprPlan();
+        PhysicalPlan plan2 = new PhysicalPlan();
         plan2.add(prj2);
-        List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+        List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
         inputs.add(plan1); 
         inputs.add(plan2);
-        PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
+        PhysicalOperator poGen = new POForEach(new OperatorKey("", r.nextLong()), 1, inputs, toBeFlattened);
         
         //DataBag obtained = bf.newDefaultBag();
         List<String> obtained = new LinkedList<String>();
@@ -228,19 +228,19 @@
     public void testSimpleGenerate() throws Exception {
         ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
         ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
-        prj1.setResultType(DataType.BAG);
-        prj2.setResultType(DataType.BAG);
+        prj1.setResultType(DataType.INTEGER);
+        prj2.setResultType(DataType.INTEGER);
         List<Boolean> toBeFlattened = new LinkedList<Boolean>();
         toBeFlattened.add(true);
         toBeFlattened.add(false);
-        ExprPlan plan1 = new ExprPlan();
+        PhysicalPlan plan1 = new PhysicalPlan();
         plan1.add(prj1);
-        ExprPlan plan2 = new ExprPlan();
+        PhysicalPlan plan2 = new PhysicalPlan();
         plan2.add(prj2);
-        List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+        List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
         inputs.add(plan1); 
         inputs.add(plan2);
-        PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
+        PhysicalOperator poGen = new POForEach(new OperatorKey("", r.nextLong()), 1, inputs, toBeFlattened);
         
         //DataBag obtained = bf.newDefaultBag();
         List<String> obtained = new LinkedList<String>();

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOMapLookUp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOMapLookUp.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOMapLookUp.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOMapLookUp.java Thu Jun 26 13:05:23 2008
@@ -25,7 +25,7 @@
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.expressionOperators.POMapLookUp;
 import org.apache.pig.impl.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.impl.plan.PlanException;
@@ -44,7 +44,7 @@
 		
 		POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
 		POMapLookUp op = new POMapLookUp(new OperatorKey("", r.nextLong()), -1);
-		ExprPlan plan = new ExprPlan();
+		PhysicalPlan plan = new PhysicalPlan();
 		plan.add(op);
 		plan.add(prj);
 		plan.connect(prj, op);



Mime
View raw message