pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r672016 [2/4] - in /incubator/pig/branches/types: ./ src/org/apache/pig/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/parser/ src/org/apache/pig/impl/logicalLayer/validators/ src/org/apache/pig/impl/mapReduceLa...
Date Thu, 26 Jun 2008 20:05:26 GMT
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java Thu Jun 26 13:05:23 2008
@@ -23,7 +23,6 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -31,7 +30,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
@@ -39,22 +37,29 @@
 import org.apache.pig.impl.builtin.RandomSampleLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.impl.mapReduceLayer.plans.UDFFinder;
-import org.apache.pig.impl.mapReduceLayer.plans.UDFFinderForExpr;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
-import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.impl.physicalLayer.plans.PlanPrinter;
-import org.apache.pig.impl.physicalLayer.relationalOperators.*;
 import org.apache.pig.impl.physicalLayer.expressionOperators.ConstantExpression;
 import org.apache.pig.impl.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.impl.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
@@ -92,14 +97,14 @@
  * 
  *
  */
-public class MRCompiler extends PhyPlanVisitor<PhysicalOperator, PhysicalPlan<PhysicalOperator>> {
+public class MRCompiler extends PhyPlanVisitor {
     
     private Log log = LogFactory.getLog(getClass());
     
     PigContext pigContext;
     
     //The plan that is being compiled
-    PhysicalPlan<PhysicalOperator> plan;
+    PhysicalPlan plan;
 
     //The plan of MapReduce Operators
     MROperPlan MRPlan;
@@ -127,17 +132,15 @@
     
     private Random r;
     
-    private UDFFinderForExpr udfFinderForExpr;
-    
     private UDFFinder udfFinder;
     
-    public MRCompiler(PhysicalPlan<PhysicalOperator> plan) {
+    public MRCompiler(PhysicalPlan plan) {
         this(plan,null);
     }
     
-    public MRCompiler(PhysicalPlan<PhysicalOperator> plan,
+    public MRCompiler(PhysicalPlan plan,
             PigContext pigContext) {
-        super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan<PhysicalOperator>>(plan));
+        super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
         this.plan = plan;
         this.pigContext = pigContext;
         splitsSeen = new HashMap<OperatorKey, MapReduceOper>();
@@ -146,7 +149,6 @@
         scope = plan.getRoots().get(0).getOperatorKey().getScope();
         r = new Random(1331);
         FileLocalizer.setR(r);
-        udfFinderForExpr = new UDFFinderForExpr();
         udfFinder = new UDFFinder();
     }
     
@@ -162,7 +164,7 @@
      * Used to get the plan that was compiled
      * @return physical plan
      */
-    public PhysicalPlan<PhysicalOperator> getPlan() {
+    public PhysicalPlan getPlan() {
         return plan;
     }
     
@@ -471,7 +473,7 @@
         Set<MapReduceOper> toBeConnected = new HashSet<MapReduceOper>();
         List<MapReduceOper> remLst = new ArrayList<MapReduceOper>();
 
-        List<PhysicalPlan<PhysicalOperator>> mpLst = new ArrayList<PhysicalPlan<PhysicalOperator>>();
+        List<PhysicalPlan> mpLst = new ArrayList<PhysicalPlan>();
 
         for (MapReduceOper mro : compiledInputs) {
             if (!mro.isMapDone()) {
@@ -519,15 +521,15 @@
         }
     }
 
-    private void addUDFs(ExprPlan plan) throws VisitorException{
+    /*private void addUDFs(PhysicalPlan plan) throws VisitorException{
         if(plan!=null){
             udfFinderForExpr.setPlan(plan);
             udfFinderForExpr.visit();
             curMROp.UDFs.addAll(udfFinderForExpr.getUDFs());
         }
-    }
+    }*/
     
-    private void addUDFs(PhysicalPlan<PhysicalOperator> plan) throws VisitorException{
+    private void addUDFs(PhysicalPlan plan) throws VisitorException{
         if(plan!=null){
             udfFinder.setPlan(plan);
             udfFinder.visit();
@@ -596,9 +598,9 @@
     public void visitLocalRearrange(POLocalRearrange op) throws VisitorException {
         try{
             nonBlocking(op);
-            List<ExprPlan> plans = op.getPlans();
+            List<PhysicalPlan> plans = op.getPlans();
             if(plans!=null)
-                for(ExprPlan ep : plans)
+                for(PhysicalPlan ep : plans)
                     addUDFs(ep);
         }catch(Exception e){
             VisitorException pe = new VisitorException(e.getMessage());
@@ -607,10 +609,14 @@
         }
     }
     
-    public void visitForEach(POForEach op) throws VisitorException{
+    public void visitPOForEach(POForEach op) throws VisitorException{
         try{
             nonBlocking(op);
-            addUDFs(op.getPlan());
+            List<PhysicalPlan> plans = op.getInputPlans();
+            if(plans!=null)
+                for (PhysicalPlan plan : plans) {
+                    addUDFs(plan);
+                }
         }catch(Exception e){
             VisitorException pe = new VisitorException(e.getMessage());
             pe.initCause(e);
@@ -654,13 +660,13 @@
     public void visitDistinct(PODistinct op) throws VisitorException {
         try{
             MapReduceOper mro = compiledInputs[0];
-            ExprPlan ep = new ExprPlan();
+            PhysicalPlan ep = new PhysicalPlan();
             POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
             prjStar.setResultType(DataType.TUPLE);
             prjStar.setStar(true);
             ep.add(prjStar);
             
-            List<ExprPlan> eps = new ArrayList<ExprPlan>();
+            List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
             eps.add(ep);
             
             POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
@@ -684,9 +690,9 @@
             pkg.setInner(inner);
             curMROp.reducePlan.add(pkg);
             
-            List<ExprPlan> eps1 = new ArrayList<ExprPlan>();
+            List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
             List<Boolean> flat1 = new ArrayList<Boolean>();
-            ExprPlan ep1 = new ExprPlan();
+            PhysicalPlan ep1 = new PhysicalPlan();
             POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
             prj1.setResultType(DataType.TUPLE);
             prj1.setStar(false);
@@ -695,14 +701,11 @@
             ep1.add(prj1);
             eps1.add(ep1);
             flat1.add(false);
-            POGenerate fe1Gen = new POGenerate(new OperatorKey(scope,nig.getNextNodeId(scope)),eps1,flat1);
-            fe1Gen.setResultType(DataType.TUPLE);
-            PhysicalPlan<PhysicalOperator> fe1Plan = new PhysicalPlan<PhysicalOperator>();
-            fe1Plan.add(fe1Gen);
-            POForEach fe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
-            fe1.setPlan(fe1Plan);
-            fe1.setResultType(DataType.TUPLE);
-            curMROp.reducePlan.addAsLeaf(fe1);
+            POForEach nfe1 = new POForEach(new OperatorKey(scope, nig
+                    .getNextNodeId(scope)), op.getRequestedParallelism(), eps1,
+                    flat1);
+            nfe1.setResultType(DataType.BAG);
+            curMROp.reducePlan.addAsLeaf(nfe1);
         }catch(Exception e){
             VisitorException pe = new VisitorException(e.getMessage());
             pe.initCause(e);
@@ -728,11 +731,11 @@
     }
     
     private int[] getSortCols(POSort sort){
-        List<ExprPlan> plans = sort.getSortPlans();
+        List<PhysicalPlan> plans = sort.getSortPlans();
         if(plans!=null){
             int[] ret = new int[plans.size()]; 
             int i=-1;
-            for (ExprPlan plan : plans) {
+            for (PhysicalPlan plan : plans) {
                 ret[++i] = ((POProject)plan.getLeaves().get(0)).getColumn();
             }
             return ret;
@@ -746,12 +749,12 @@
         mro.setGlobalSort(true);
         mro.requestedParallelism = rp;
         
-        List<ExprPlan> eps1 = new ArrayList<ExprPlan>();
+        List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
         
         if(fields==null)
             throw new PlanException("No Expression Plan found in POSort");
         for (int i : fields) {
-            ExprPlan ep = new ExprPlan();
+            PhysicalPlan ep = new PhysicalPlan();
             POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
             prj.setColumn(i);
             prj.setOverloaded(false);
@@ -776,25 +779,19 @@
         pkg.setInner(inner);
         mro.reducePlan.add(pkg);
         
-        ExprPlan ep = new ExprPlan();
+        PhysicalPlan ep = new PhysicalPlan();
         POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
         prj.setColumn(1);
         prj.setOverloaded(false);
         prj.setResultType(DataType.BAG);
         ep.add(prj);
-        List<ExprPlan> eps2 = new ArrayList<ExprPlan>();
+        List<PhysicalPlan> eps2 = new ArrayList<PhysicalPlan>();
         eps2.add(ep);
         List<Boolean> flattened = new ArrayList<Boolean>();
         flattened.add(true);
-        POGenerate fe1Gen = new POGenerate(new OperatorKey(scope,nig.getNextNodeId(scope)),eps2,flattened);
-        fe1Gen.setResultType(DataType.TUPLE);
-        PhysicalPlan<PhysicalOperator> fe1Plan = new PhysicalPlan<PhysicalOperator>();
-        fe1Plan.add(fe1Gen);
-        POForEach fe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        fe1.setPlan(fe1Plan);
-        fe1.setResultType(DataType.TUPLE);
-        mro.reducePlan.add(fe1);
-        mro.reducePlan.connect(pkg, fe1);
+        POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps2,flattened);
+        mro.reducePlan.add(nfe1);
+        mro.reducePlan.connect(pkg, nfe1);
 //        ep1.add(innGen);
         return mro;
     }
@@ -809,13 +806,13 @@
         if(sort.isUDFComparatorUsed)
             mro.UDFs.add(sort.getMSortFunc().getFuncSpec());
         
-        List<ExprPlan> eps1 = new ArrayList<ExprPlan>();
+        List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
         List<Boolean> flat1 = new ArrayList<Boolean>();
         
         if(fields==null)
             throw new PlanException("No Expression Plan found in POSort");
         for (int i : fields) {
-            ExprPlan ep = new ExprPlan();
+            PhysicalPlan ep = new PhysicalPlan();
             POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
             prj.setColumn(i);
             prj.setOverloaded(false);
@@ -824,22 +821,16 @@
             eps1.add(ep);
             flat1.add(true);
         }
-        POGenerate fe1Gen = new POGenerate(new OperatorKey(scope,nig.getNextNodeId(scope)),eps1,flat1);
-        fe1Gen.setResultType(DataType.TUPLE);
-        PhysicalPlan<PhysicalOperator> fe1Plan = new PhysicalPlan<PhysicalOperator>();
-        fe1Plan.add(fe1Gen);
-        POForEach fe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        fe1.setPlan(fe1Plan);
-        fe1.setResultType(DataType.TUPLE);
-        mro.mapPlan.addAsLeaf(fe1);
+        POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps1,flat1);
+        mro.mapPlan.addAsLeaf(nfe1);
         
-        ExprPlan ep1 = new ExprPlan();
+        PhysicalPlan ep1 = new PhysicalPlan();
         ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
         ce.setValue("all");
         ce.setResultType(DataType.CHARARRAY);
         ep1.add(ce);
         
-        List<ExprPlan> eps = new ArrayList<ExprPlan>();
+        List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
         eps.add(ep1);
         
         POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
@@ -848,7 +839,7 @@
         lr.setPlans(eps);
         lr.setResultType(DataType.TUPLE);
         mro.mapPlan.add(lr);
-        mro.mapPlan.connect(fe1, lr);
+        mro.mapPlan.connect(nfe1, lr);
         
         mro.setMapDone(true);
         
@@ -859,7 +850,7 @@
         pkg.setInner(inner);
         mro.reducePlan.add(pkg);
         
-        PhysicalPlan<PhysicalOperator> fe2Plan = new PhysicalPlan<PhysicalOperator>();
+        PhysicalPlan fe2Plan = new PhysicalPlan();
         
         POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
         topPrj.setColumn(1);
@@ -867,13 +858,13 @@
         topPrj.setOverloaded(true);
         fe2Plan.add(topPrj);
         
-        ExprPlan nesSortPlan = new ExprPlan();
+        PhysicalPlan nesSortPlan = new PhysicalPlan();
         POProject prjStar2 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
         prjStar2.setResultType(DataType.TUPLE);
         prjStar2.setStar(true);
         nesSortPlan.add(prjStar2);
         
-        List<ExprPlan> nesSortPlanLst = new ArrayList<ExprPlan>();
+        List<PhysicalPlan> nesSortPlanLst = new ArrayList<PhysicalPlan>();
         nesSortPlanLst.add(nesSortPlan);
         
         sort.setSortPlans(nesSortPlanLst);
@@ -881,39 +872,33 @@
         fe2Plan.add(sort);
         fe2Plan.connect(topPrj, sort);
         
-        ExprPlan ep3 = new ExprPlan();
+        PhysicalPlan ep3 = new PhysicalPlan();
         POProject prjStar3 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
         prjStar3.setResultType(DataType.BAG);
-        prjStar3.setColumn(0);
+        prjStar3.setColumn(1);
         prjStar3.setStar(false);
         ep3.add(prjStar3);
         
-        ExprPlan rpep = new ExprPlan();
+        PhysicalPlan rpep = new PhysicalPlan();
         ConstantExpression rpce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
         rpce.setRequestedParallelism(rp);
         rpce.setValue(rp<=0?1:rp);
         rpce.setResultType(DataType.INTEGER);
         rpep.add(rpce);
         
-        List<ExprPlan> genEps = new ArrayList<ExprPlan>();
+        List<PhysicalPlan> genEps = new ArrayList<PhysicalPlan>();
         genEps.add(rpep);
         genEps.add(ep3);
         
         List<Boolean> flattened2 = new ArrayList<Boolean>();
         flattened2.add(false);
         flattened2.add(false);
-        POGenerate nesGen = new POGenerate(new OperatorKey(scope,nig.getNextNodeId(scope)), genEps, flattened2);
-        fe2Plan.add(nesGen);
-        fe2Plan.connect(sort, nesGen);
-        
-        POForEach fe2 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        fe2.setPlan(fe2Plan);
-        fe2.setResultType(DataType.TUPLE);
         
-        mro.reducePlan.add(fe2);
-        mro.reducePlan.connect(pkg, fe2);
+        POForEach nfe2 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1, genEps, flattened2);
+        mro.reducePlan.add(nfe2);
+        mro.reducePlan.connect(pkg, nfe2);
         
-        ExprPlan ep4 = new ExprPlan();
+        PhysicalPlan ep4 = new PhysicalPlan();
         POProject prjStar4 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
         prjStar4.setResultType(DataType.TUPLE);
         prjStar4.setStar(true);
@@ -925,26 +910,19 @@
         ep4.add(uf);
         ep4.connect(prjStar4, uf);
         
-        List<ExprPlan> ep4s = new ArrayList<ExprPlan>();
+        List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>();
         ep4s.add(ep4);
         List<Boolean> flattened3 = new ArrayList<Boolean>();
         flattened3.add(false);
-        POGenerate finGen = new POGenerate(new OperatorKey(scope,nig.getNextNodeId(scope)), ep4s, flattened3);
-        
-        PhysicalPlan<PhysicalOperator> fe3Plan = new PhysicalPlan<PhysicalOperator>();
-        fe3Plan.add(finGen);
-        
-        POForEach fe3 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        fe3.setPlan(fe3Plan);
-        fe3.setResultType(DataType.TUPLE);
+        POForEach nfe3 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ep4s, flattened3);
         
-        mro.reducePlan.add(fe3);
-        mro.reducePlan.connect(fe2, fe3);
+        mro.reducePlan.add(nfe3);
+        mro.reducePlan.connect(nfe2, nfe3);
         
         POStore str = getStore();
         str.setSFile(quantFile);
         mro.reducePlan.add(str);
-        mro.reducePlan.connect(fe3, str);
+        mro.reducePlan.connect(nfe3, str);
         
         mro.setReduceDone(true);
 //        mro.requestedParallelism = rp;
@@ -957,10 +935,10 @@
         pc.connect();
         MRCompiler comp = new MRCompiler(null, pc);
         Random r = new Random();
-        List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+        List<PhysicalPlan> sortPlans = new LinkedList<PhysicalPlan>();
         POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
         pr1.setResultType(DataType.INTEGER);
-        ExprPlan expPlan = new ExprPlan();
+        PhysicalPlan expPlan = new PhysicalPlan();
         expPlan.add(pr1);
         sortPlans.add(expPlan);
         List<Boolean> mAscCols = new LinkedList<Boolean>();

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java Thu Jun 26 13:05:23 2008
@@ -27,7 +27,7 @@
 public class MapReduceLauncher extends Launcher{
     private static final Log log = LogFactory.getLog(Launcher.class);
     @Override
-    public boolean launchPig(PhysicalPlan<PhysicalOperator> php,
+    public boolean launchPig(PhysicalPlan php,
                              String grpName,
                              PigContext pc) throws PlanException,
                                                    VisitorException,

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java Thu Jun 26 13:05:23 2008
@@ -42,16 +42,16 @@
 public class MapReduceOper extends Operator<MROpPlanVisitor> {
     //The physical plan that should be executed
     //in the map phase
-    public PhysicalPlan<PhysicalOperator> mapPlan;
+    public PhysicalPlan mapPlan;
     
     //The physical plan that should be executed
     //in the reduce phase
-    public PhysicalPlan<PhysicalOperator> reducePlan;
+    public PhysicalPlan reducePlan;
     
     //The physical plan that should be executed
     //in the combine phase if one exists. Will be used
     //by the optimizer.
-    public PhysicalPlan<PhysicalOperator> combinePlan;
+    public PhysicalPlan combinePlan;
     
     //Indicates that the map plan creation
     //is complete
@@ -77,9 +77,9 @@
 
     public MapReduceOper(OperatorKey k) {
         super(k);
-        mapPlan = new PhysicalPlan<PhysicalOperator>();
-        combinePlan = new PhysicalPlan<PhysicalOperator>();
-        reducePlan = new PhysicalPlan<PhysicalOperator>();
+        mapPlan = new PhysicalPlan();
+        combinePlan = new PhysicalPlan();
+        reducePlan = new PhysicalPlan();
         UDFs = new ArrayList<String>();
         nig = NodeIdGenerator.getGenerator();
         scope = k.getScope();

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigCombiner.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigCombiner.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigCombiner.java Thu Jun 26 13:05:23 2008
@@ -74,7 +74,7 @@
         private final Log log = LogFactory.getLog(getClass());
         
         //The reduce plan
-        private PhysicalPlan<PhysicalOperator> cp;
+        private PhysicalPlan cp;
         
         //The POPackage operator which is the
         //root of every Map Reduce plan is
@@ -94,7 +94,7 @@
             super.configure(jConf);
             sJobConf = jConf;
             try {
-                cp = (PhysicalPlan<PhysicalOperator>) ObjectSerializer.deserialize(jConf
+                cp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
                         .get("pig.combinePlan"));
                 pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.combine.package"));
                 // To be removed
@@ -205,78 +205,4 @@
         }
     }
     
-    /*interface MapOutputCollector<K extends WritableComparable, V extends Writable>
-    extends OutputCollector<K, V> {
-
-        public void close() throws IOException;
-
-        public void flush() throws IOException;
-
-    }
-
-    static class DirectMapOutputCollector<K extends WritableComparable, V extends Writable>
-            implements MapOutputCollector<K, V> {
-
-        private RecordWriter<K, V> out = null;
-
-        private Reporter reporter = null;
-
-        @SuppressWarnings("unchecked")
-        public DirectMapOutputCollector(JobConf job, Reporter reporter)
-                throws IOException {
-            this.reporter = reporter;
-            String finalName = job.getOutputPath().toString();
-            FileSystem fs = FileSystem.get(job);
-
-            out = job.getOutputFormat().getRecordWriter(fs, job, finalName,
-                    reporter);
-        }
-
-        public void close() throws IOException {
-            if (this.out != null) {
-                out.close(this.reporter);
-            }
-
-        }
-
-        public void flush() throws IOException {
-            // TODO Auto-generated method stub
-
-        }
-
-        public void collect(K key, V value) throws IOException {
-            System.out.println(value.toString());
-        }
-    }
-    
-    public static void main(String[] args) throws IOException {
-        Random r = new Random();
-        PhysicalPlan<PhysicalOperator> rp = new PhysicalPlan<PhysicalOperator>();
-        POForEach fe = GenPhyOp.topForEachOPWithPlan(1);
-        rp.add(fe);
-        PigMapReduce.Reduce red = new PigMapReduce.Reduce();
-        POPackage pk = GenPhyOp.topPackageOp();
-        pk.setKeyType(DataType.INTEGER);
-        pk.setNumInps(1);
-        boolean[] inner = {false}; 
-        pk.setInner(inner);
-        
-        JobConf jConf = new JobConf();
-        jConf.set("pig.reducePlan", ObjectSerializer.serialize(rp));
-        jConf.set("pig.reduce.package",ObjectSerializer.serialize(pk));
-        jConf.setOutputFormat(PigOutputFormat.class);
-        jConf.setOutputPath(new Path("pigmrtst1"));
-        red.configure(jConf);
-        
-        WritableComparable key = new IntWritable(1);
-        List<IndexedTuple> itLst = new ArrayList<IndexedTuple>();
-        for(int i=0;i<2;i++){
-            Tuple t = TupleFactory.getInstance().newTuple();
-            t.append(GenRandomData.genRandString(r));
-            t.append(1);
-            IndexedTuple it = new IndexedTuple(t,0);
-            itLst.add(it);
-        }
-        red.reduce(key,itLst.iterator(),new DirectMapOutputCollector(jConf,reporter), reporter);
-    }*/
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java Thu Jun 26 13:05:23 2008
@@ -28,7 +28,7 @@
     private final Log log = LogFactory.getLog(getClass());
     
     //Map Plan
-    protected PhysicalPlan<PhysicalOperator> mp;
+    protected PhysicalPlan mp;
     
     // Reporter that will be used by operators
     // to transmit heartbeat
@@ -54,7 +54,7 @@
         super.configure(job);
         PigMapReduce.sJobConf = job;
         try {
-            mp = (PhysicalPlan<PhysicalOperator>) ObjectSerializer.deserialize(job
+            mp = (PhysicalPlan) ObjectSerializer.deserialize(job
                     .get("pig.mapPlan"));
             
             // To be removed
@@ -104,7 +104,7 @@
         }
         
         for (OperatorKey targetKey : inpTuple.targetOps) {
-            PhysicalOperator<PhyPlanVisitor> target = mp.getOperator(targetKey);
+            PhysicalOperator target = mp.getOperator(targetKey);
             Tuple t = inpTuple.toTuple();
             target.attachInput(t);
         }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java Thu Jun 26 13:05:23 2008
@@ -86,7 +86,7 @@
         private final Log log = LogFactory.getLog(getClass());
         
         //The reduce plan
-        private PhysicalPlan<PhysicalOperator> rp;
+        private PhysicalPlan rp;
         
         //The POPackage operator which is the
         //root of every Map Reduce plan is
@@ -106,7 +106,7 @@
             super.configure(jConf);
             sJobConf = jConf;
             try {
-                rp = (PhysicalPlan<PhysicalOperator>) ObjectSerializer.deserialize(jConf
+                rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
                         .get("pig.reducePlan"));
                 pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.reduce.package"));
                 // To be removed
@@ -212,78 +212,4 @@
         }
     }
     
-    /*interface MapOutputCollector<K extends WritableComparable, V extends Writable>
-    extends OutputCollector<K, V> {
-
-        public void close() throws IOException;
-
-        public void flush() throws IOException;
-
-    }
-
-    static class DirectMapOutputCollector<K extends WritableComparable, V extends Writable>
-            implements MapOutputCollector<K, V> {
-
-        private RecordWriter<K, V> out = null;
-
-        private Reporter reporter = null;
-
-        @SuppressWarnings("unchecked")
-        public DirectMapOutputCollector(JobConf job, Reporter reporter)
-                throws IOException {
-            this.reporter = reporter;
-            String finalName = job.getOutputPath().toString();
-            FileSystem fs = FileSystem.get(job);
-
-            out = job.getOutputFormat().getRecordWriter(fs, job, finalName,
-                    reporter);
-        }
-
-        public void close() throws IOException {
-            if (this.out != null) {
-                out.close(this.reporter);
-            }
-
-        }
-
-        public void flush() throws IOException {
-            // TODO Auto-generated method stub
-
-        }
-
-        public void collect(K key, V value) throws IOException {
-            System.out.println(value.toString());
-        }
-    }
-    
-    public static void main(String[] args) throws IOException {
-        Random r = new Random();
-        PhysicalPlan<PhysicalOperator> rp = new PhysicalPlan<PhysicalOperator>();
-        POForEach fe = GenPhyOp.topForEachOPWithPlan(1);
-        rp.add(fe);
-        PigMapReduce.Reduce red = new PigMapReduce.Reduce();
-        POPackage pk = GenPhyOp.topPackageOp();
-        pk.setKeyType(DataType.INTEGER);
-        pk.setNumInps(1);
-        boolean[] inner = {false}; 
-        pk.setInner(inner);
-        
-        JobConf jConf = new JobConf();
-        jConf.set("pig.reducePlan", ObjectSerializer.serialize(rp));
-        jConf.set("pig.reduce.package",ObjectSerializer.serialize(pk));
-        jConf.setOutputFormat(PigOutputFormat.class);
-        jConf.setOutputPath(new Path("pigmrtst1"));
-        red.configure(jConf);
-        
-        WritableComparable key = new IntWritable(1);
-        List<IndexedTuple> itLst = new ArrayList<IndexedTuple>();
-        for(int i=0;i<2;i++){
-            Tuple t = TupleFactory.getInstance().newTuple();
-            t.append(GenRandomData.genRandString(r));
-            t.append(1);
-            IndexedTuple it = new IndexedTuple(t,0);
-            itLst.add(it);
-        }
-        red.reduce(key,itLst.iterator(),new DirectMapOutputCollector(jConf,reporter), reporter);
-    }*/
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinder.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinder.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinder.java Thu Jun 26 13:05:23 2008
@@ -4,46 +4,41 @@
 import java.util.List;
 
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.relationalOperators.POFilter;
-import org.apache.pig.impl.physicalLayer.relationalOperators.POGenerate;
 import org.apache.pig.impl.physicalLayer.relationalOperators.POSort;
-import org.apache.pig.impl.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class UDFFinder extends PhyPlanVisitor {
     List<String> UDFs;
-    DepthFirstWalker<PhysicalOperator, PhysicalPlan<PhysicalOperator>> dfw;
-    UDFFinderForExpr udfFinderForExpr;
+    DepthFirstWalker<PhysicalOperator, PhysicalPlan> dfw;
     
     public UDFFinder(){
         this(null, null);
     }
     
-    public UDFFinder(ExprPlan plan,
-            PlanWalker<ExpressionOperator, ExprPlan> walker) {
+    public UDFFinder(PhysicalPlan plan, PlanWalker<PhysicalOperator, PhysicalPlan> walker) {
         super(plan, walker);
         UDFs = new ArrayList<String>();
-        dfw = new DepthFirstWalker<PhysicalOperator, PhysicalPlan<PhysicalOperator>>(null);
-        udfFinderForExpr = new UDFFinderForExpr();
+        dfw = new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(null);
     }
 
     public List<String> getUDFs() {
         return UDFs;
     }
     
-    public void setPlan(PhysicalPlan<PhysicalOperator> plan){
+    public void setPlan(PhysicalPlan plan){
         mPlan = plan;
         dfw.setPlan(plan);
         mCurrentWalker = dfw;
         UDFs.clear();
     }
     
-    private void addUDFsIn(ExprPlan ep) throws VisitorException{
+    /*private void addUDFsIn(PhysicalPlan ep) throws VisitorException{
         udfFinderForExpr.setPlan(ep);
         udfFinderForExpr.visit();
         UDFs.addAll(udfFinderForExpr.getUDFs());
@@ -56,11 +51,11 @@
 
     @Override
     public void visitGenerate(POGenerate op) throws VisitorException {
-        List<ExprPlan> eps = op.getInputPlans();
-        for (ExprPlan ep : eps) {
+        List<PhysicalPlan> eps = op.getInputPlans();
+        for (PhysicalPlan ep : eps) {
             addUDFsIn(ep);
         }
-    }
+    }*/
 
     @Override
     public void visitSort(POSort op) throws VisitorException {
@@ -68,5 +63,8 @@
             UDFs.add(op.getMSortFunc().getFuncSpec());
     }
     
-    
+    @Override
+    public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+        UDFs.add(userFunc.getFuncSpec());
+    }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java Thu Jun 26 13:05:23 2008
@@ -20,17 +20,16 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.backend.executionengine.ExecException;
 
 /**
  * 
@@ -55,10 +54,10 @@
  * 
  * @param <V>
  */
-public abstract class PhysicalOperator<V extends PhyPlanVisitor> extends
-        Operator<V> {
+public abstract class PhysicalOperator extends
+        Operator<PhyPlanVisitor> {
 
-    private Log log = LogFactory.getLog(getClass());
+//    private Log log = LogFactory.getLog(getClass());
 
     protected static final long serialVersionUID = 1L;
 
@@ -222,7 +221,7 @@
         }
     }
 
-    public abstract void visit(V v) throws VisitorException;
+    public abstract void visit(PhyPlanVisitor v) throws VisitorException;
 
     public Result getNext(Integer i) throws ExecException {
         return res;
@@ -261,7 +260,14 @@
     }
 
     public Result getNext(DataBag db) throws ExecException {
-        return res;
+        Result ret = new Result();
+        DataBag tmpBag = BagFactory.getInstance().newDefaultBag();
+        for(ret = getNext(dummyTuple);ret.returnStatus!=POStatus.STATUS_EOP;ret=getNext(dummyTuple)){
+            tmpBag.add((Tuple)ret.result);
+        }
+        ret.result = tmpBag;
+        ret.returnStatus = POStatus.STATUS_OK;
+        return ret;
     }
 
     public static void setReporter(PigProgressable reporter) {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Add.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Add.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Add.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Add.java Thu Jun 26 13:05:23 2008
@@ -22,7 +22,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class Add extends BinaryExpressionOperator {
@@ -41,7 +41,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitAdd(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/ConstantExpression.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/ConstantExpression.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/ConstantExpression.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/ConstantExpression.java Thu Jun 26 13:05:23 2008
@@ -23,10 +23,11 @@
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 
@@ -77,7 +78,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitConstant(this);
     }
 
@@ -87,68 +88,79 @@
 
     public void setValue(Object value) {
         this.value = value;
+        Tuple dummyTuple = TupleFactory.getInstance().newTuple(1);
+        attachInput(dummyTuple);
     }
 
     @Override
     public Result getNext(DataBag db) throws ExecException {
+        res = processInput();
+        if(res.returnStatus!=POStatus.STATUS_OK)
+            return res;
         
-        res.returnStatus = POStatus.STATUS_OK;
         res.result = (DataBag)value;
         return res;
     }
 
     @Override
     public Result getNext(DataByteArray ba) throws ExecException {
-        
-        res.returnStatus = POStatus.STATUS_OK;
+        res = processInput();
+        if(res.returnStatus!=POStatus.STATUS_OK)
+            return res;
         res.result = (DataByteArray)value;
         return res;
     }
 
     @Override
     public Result getNext(Double d) throws ExecException {
-        
-        res.returnStatus = POStatus.STATUS_OK;
+        res = processInput();
+        if(res.returnStatus!=POStatus.STATUS_OK)
+            return res;
         res.result = (Double)value;
         return res;
     }
 
     @Override
     public Result getNext(Float f) throws ExecException {
-        
-        res.returnStatus = POStatus.STATUS_OK;
+        res = processInput();
+        if(res.returnStatus!=POStatus.STATUS_OK)
+            return res;
         res.result = (Float)value;
         return res;
     }
 
     @Override
     public Result getNext(Integer i) throws ExecException {
-        
-        res.returnStatus = POStatus.STATUS_OK;
+        res = processInput();
+        if(res.returnStatus!=POStatus.STATUS_OK)
+            return res;
         res.result = (Integer)value;
         return res;
     }
 
     @Override
     public Result getNext(Long l) throws ExecException {
-        
-        res.returnStatus = POStatus.STATUS_OK;
+        res = processInput();
+        if(res.returnStatus!=POStatus.STATUS_OK)
+            return res;
         res.result = (Long)value;
         return res;
     }
 
     @Override
     public Result getNext(String s) throws ExecException {
-        
-        res.returnStatus = POStatus.STATUS_OK;
+        res = processInput();
+        if(res.returnStatus!=POStatus.STATUS_OK)
+            return res;
         res.result = (String)value;
         return res;
     }
 
     @Override
     public Result getNext(Tuple t) throws ExecException {
-        
-        res.returnStatus = POStatus.STATUS_OK;
+        res = processInput();
+        if(res.returnStatus!=POStatus.STATUS_OK)
+            return res;
         res.result = (Tuple)value;
         return res;
     }
@@ -157,16 +169,18 @@
     
     @Override
     public Result getNext(Boolean b) throws ExecException {
-        
-        res.returnStatus = POStatus.STATUS_OK;
+        res = processInput();
+        if(res.returnStatus!=POStatus.STATUS_OK)
+            return res;
         res.result = (Boolean)value;
         return res;
     }
 
     @Override
     public Result getNext(Map m) throws ExecException {
-        
-        res.returnStatus = POStatus.STATUS_OK;
+        res = processInput();
+        if(res.returnStatus!=POStatus.STATUS_OK)
+            return res;
         res.result = (Map)value;
         return res;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Divide.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Divide.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Divide.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Divide.java Thu Jun 26 13:05:23 2008
@@ -22,7 +22,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class Divide extends BinaryExpressionOperator {
@@ -41,7 +41,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitDivide(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java Thu Jun 26 13:05:23 2008
@@ -29,7 +29,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class EqualToExpr extends BinaryComparisonOperator {
@@ -49,7 +49,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitEqualTo(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/ExpressionOperator.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/ExpressionOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/ExpressionOperator.java Thu Jun 26 13:05:23 2008
@@ -18,9 +18,14 @@
 package org.apache.pig.impl.physicalLayer.expressionOperators;
 
 
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.Result;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -29,7 +34,7 @@
  *
  */
 
-public abstract class ExpressionOperator extends PhysicalOperator<ExprPlanVisitor> {
+public abstract class ExpressionOperator extends PhysicalOperator {
     private static final long serialVersionUID = 1L;
     
     public ExpressionOperator(OperatorKey k) {
@@ -45,5 +50,10 @@
         return false;
     }
     
-    public abstract void visit(ExprPlanVisitor v) throws VisitorException;
+    @Override
+    public Result getNext(DataBag db) throws ExecException {
+        return new Result();
+    }
+    
+    public abstract void visit(PhyPlanVisitor v) throws VisitorException;
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java Thu Jun 26 13:05:23 2008
@@ -24,7 +24,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.backend.executionengine.ExecException;
 
@@ -46,7 +46,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitGTOrEqual(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java Thu Jun 26 13:05:23 2008
@@ -22,7 +22,7 @@
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
 import org.apache.pig.impl.plan.VisitorException;
@@ -50,7 +50,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitGreaterThan(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java Thu Jun 26 13:05:23 2008
@@ -24,7 +24,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.backend.executionengine.ExecException;
 
@@ -44,7 +44,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visiLTOrEqual(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java Thu Jun 26 13:05:23 2008
@@ -24,7 +24,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.backend.executionengine.ExecException;
 
@@ -45,7 +45,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitLessThan(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Mod.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Mod.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Mod.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Mod.java Thu Jun 26 13:05:23 2008
@@ -22,7 +22,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class Mod extends BinaryExpressionOperator {
@@ -41,7 +41,7 @@
     }
     
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitMod(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Multiply.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Multiply.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Multiply.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Multiply.java Thu Jun 26 13:05:23 2008
@@ -22,7 +22,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class Multiply extends BinaryExpressionOperator {
@@ -41,7 +41,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitMultiply(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java Thu Jun 26 13:05:23 2008
@@ -29,7 +29,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class NotEqualToExpr extends BinaryComparisonOperator {
@@ -49,7 +49,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitNotEqualTo(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java Thu Jun 26 13:05:23 2008
@@ -22,7 +22,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -45,7 +45,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitAnd(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POBinCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POBinCond.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POBinCond.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POBinCond.java Thu Jun 26 13:05:23 2008
@@ -24,10 +24,9 @@
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class POBinCond extends ExpressionOperator {
@@ -119,7 +118,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitBinCond(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java Thu Jun 26 13:05:23 2008
@@ -34,7 +34,7 @@
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -70,7 +70,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitCast(this);
 
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POIsNull.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POIsNull.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POIsNull.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POIsNull.java Thu Jun 26 13:05:23 2008
@@ -23,7 +23,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
@@ -48,7 +48,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         //v.visitIsNull(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POMapLookUp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POMapLookUp.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POMapLookUp.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POMapLookUp.java Thu Jun 26 13:05:23 2008
@@ -27,7 +27,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class POMapLookUp extends ExpressionOperator {
@@ -56,7 +56,7 @@
 	}
 
 	@Override
-	public void visit(ExprPlanVisitor v) throws VisitorException {
+	public void visit(PhyPlanVisitor v) throws VisitorException {
 		v.visitMapLookUp(this);
 
 	}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONegative.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONegative.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONegative.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONegative.java Thu Jun 26 13:05:23 2008
@@ -22,7 +22,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class PONegative extends UnaryExpressionOperator {
@@ -43,7 +43,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitNegative(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONot.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONot.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONot.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONot.java Thu Jun 26 13:05:23 2008
@@ -22,7 +22,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -45,7 +45,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitNot(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POOr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POOr.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POOr.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POOr.java Thu Jun 26 13:05:23 2008
@@ -22,7 +22,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -45,7 +45,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitOr(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java Thu Jun 26 13:05:23 2008
@@ -21,14 +21,16 @@
 import java.util.Map;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -93,7 +95,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitProject(this);
     }
     
@@ -126,7 +128,25 @@
 
     @Override
     public Result getNext(DataBag db) throws ExecException {
-        return getNext();
+        Result res = processInputBag();
+        if(res.returnStatus!=POStatus.STATUS_OK)
+            return res;
+        DataBag inpBag = (DataBag) res.result;
+
+        if(isInputAttached() || star){
+            res.result = inpBag;
+            res.returnStatus = POStatus.STATUS_OK;
+            detachInput();
+            return res;
+        }
+        DataBag outBag = BagFactory.getInstance().newDefaultBag();
+        for (Tuple tuple : inpBag) {
+            Tuple tmpTuple = TupleFactory.getInstance().newTuple(tuple.get(column));
+            outBag.add(tmpTuple);
+        }
+        res.result = outBag;
+        res.returnStatus = POStatus.STATUS_OK;
+        return res;
     }
 
     @Override
@@ -239,5 +259,26 @@
     public void setStar(boolean star) {
         this.star = star;
     }
+    
+    private Result processInputBag() throws ExecException {
+        
+        Result res = new Result();
+        if (input==null && (inputs == null || inputs.size()==0)) {
+//            log.warn("No inputs found. Signaling End of Processing.");
+            res.returnStatus = POStatus.STATUS_EOP;
+            return res;
+        }
+        
+        //Should be removed once the model is clear
+        if(reporter!=null) reporter.progress();
+        
+        if(!isInputAttached())
+            return inputs.get(0).getNext(dummyBag);
+        else{
+            res.result = (DataBag)input.get(column);
+            res.returnStatus = POStatus.STATUS_OK;
+            return res;
+        }
+    }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PORegexp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PORegexp.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PORegexp.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PORegexp.java Thu Jun 26 13:05:23 2008
@@ -24,7 +24,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class PORegexp extends BinaryComparisonOperator {
@@ -43,7 +43,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitRegexp(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java Thu Jun 26 13:05:23 2008
@@ -40,7 +40,7 @@
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class POUserFunc extends ExpressionOperator {
@@ -141,8 +141,13 @@
                 }
                 if(temp.returnStatus!=POStatus.STATUS_OK)
                     return temp;
+
                 ((Tuple)res.result).append(temp.result);
 			}
+            Tuple rslt = ((Tuple)res.result);
+            if(rslt.size()==1 && rslt.get(0) instanceof Tuple){
+                res.result = rslt.get(0);
+            }
 			res.returnStatus = temp.returnStatus;
 			return res;
 		}
@@ -300,7 +305,7 @@
 
 	@Override
 	public String name() {
-	    return "POUserFunc" + "(" + func.getClass().getName() + ")" + " - " + mKey.toString();
+	    return "POUserFunc" + "(" + func.getClass().getName() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
 	}
 
 	@Override
@@ -316,7 +321,7 @@
 	}
 
 	@Override
-	public void visit(ExprPlanVisitor v) throws VisitorException {
+	public void visit(PhyPlanVisitor v) throws VisitorException {
 
 		v.visitUserFunc(this);
 	}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Subtract.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Subtract.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Subtract.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Subtract.java Thu Jun 26 13:05:23 2008
@@ -22,7 +22,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.physicalLayer.POStatus;
 import org.apache.pig.impl.physicalLayer.Result;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class Subtract extends BinaryExpressionOperator {
@@ -41,7 +41,7 @@
     }
 
     @Override
-    public void visit(ExprPlanVisitor v) throws VisitorException {
+    public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitSubtract(this);
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/UnaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/UnaryExpressionOperator.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/UnaryExpressionOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/UnaryExpressionOperator.java Thu Jun 26 13:05:23 2008
@@ -18,7 +18,7 @@
 package org.apache.pig.impl.physicalLayer.expressionOperators;
 
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.impl.plan.VisitorException;
@@ -46,8 +46,8 @@
     /**
      * Set the contained expression to the be the input value.
      */
-    public void setInputAsExpr(PhysicalPlan<ExpressionOperator> plan) {
-        expr = plan.getPredecessors(this).get(0);
+    public void setInputAsExpr(PhysicalPlan plan) {
+        expr = (ExpressionOperator)plan.getPredecessors(this).get(0);
     }
 
     /**

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java Thu Jun 26 13:05:23 2008
@@ -20,14 +20,43 @@
 import java.util.List;
 
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
-import org.apache.pig.impl.physicalLayer.relationalOperators.*;
-import org.apache.pig.impl.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.physicalLayer.expressionOperators.Add;
+import org.apache.pig.impl.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.impl.physicalLayer.expressionOperators.Divide;
+import org.apache.pig.impl.physicalLayer.expressionOperators.EqualToExpr;
+import org.apache.pig.impl.physicalLayer.expressionOperators.GTOrEqualToExpr;
+import org.apache.pig.impl.physicalLayer.expressionOperators.GreaterThanExpr;
+import org.apache.pig.impl.physicalLayer.expressionOperators.LTOrEqualToExpr;
+import org.apache.pig.impl.physicalLayer.expressionOperators.LessThanExpr;
+import org.apache.pig.impl.physicalLayer.expressionOperators.Mod;
+import org.apache.pig.impl.physicalLayer.expressionOperators.Multiply;
+import org.apache.pig.impl.physicalLayer.expressionOperators.NotEqualToExpr;
+import org.apache.pig.impl.physicalLayer.expressionOperators.POAnd;
+import org.apache.pig.impl.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.impl.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.impl.physicalLayer.expressionOperators.POMapLookUp;
+import org.apache.pig.impl.physicalLayer.expressionOperators.PONegative;
+import org.apache.pig.impl.physicalLayer.expressionOperators.PONot;
+import org.apache.pig.impl.physicalLayer.expressionOperators.POOr;
+import org.apache.pig.impl.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.impl.physicalLayer.expressionOperators.PORegexp;
 import org.apache.pig.impl.physicalLayer.expressionOperators.POUserFunc;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.physicalLayer.expressionOperators.Subtract;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.relationalOperators.PORead;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.PlanVisitor;
 
 /**
  * The visitor class for the Physical Plan. To use this,
@@ -40,12 +69,10 @@
  * and to maintain any relevant state information between the visits
  * to two different operators.
  *
- * @param <O>
- * @param <P>
  */
-public class PhyPlanVisitor<O extends PhysicalOperator, P extends PhysicalPlan<O>> extends PlanVisitor<O,P> {
+public class PhyPlanVisitor extends PlanVisitor<PhysicalOperator,PhysicalPlan> {
 
-    public PhyPlanVisitor(P plan, PlanWalker<O, P> walker) {
+    public PhyPlanVisitor(PhysicalPlan plan, PlanWalker<PhysicalOperator, PhysicalPlan> walker) {
         super(plan, walker);
     }
 
@@ -58,27 +85,19 @@
     }
     
     public void visitFilter(POFilter fl) throws VisitorException{
-        ExprPlanVisitor epv = new ExprPlanVisitor(fl.getPlan(),
-            new DepthFirstWalker<ExpressionOperator, ExprPlan>(fl.getPlan()));
-        epv.visit();
+        pushWalker(mCurrentWalker.spawnChildWalker(fl.getPlan()));
+        visit();
+        popWalker();
     }
     
     public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
-        List<ExprPlan> inpPlans = lr.getPlans();
-        for (ExprPlan plan : inpPlans) {
-            ExprPlanVisitor epv = new ExprPlanVisitor(plan,new DependencyOrderWalker<ExpressionOperator, ExprPlan>(plan));
-            epv.visit();
+        List<PhysicalPlan> inpPlans = lr.getPlans();
+        for (PhysicalPlan plan : inpPlans) {
+            pushWalker(mCurrentWalker.spawnChildWalker(plan));
+            visit();
         }
     }
     
-    public void visitForEach(POForEach fe) throws VisitorException{
-        pushWalker(mCurrentWalker.spawnChildWalker((P)fe.getPlan()));
-        // this causes the current walker (the new one we created)
-        // to walk the nested plan
-        visit();
-        popWalker();
-    }
-    
     public void visitGlobalRearrange(POGlobalRearrange gr) throws VisitorException{
         //do nothing
     }
@@ -87,11 +106,12 @@
         //do nothing
     }
     
-    public void visitGenerate(POGenerate pogen) throws VisitorException {
-        List<ExprPlan> inpPlans = pogen.getInputPlans();
-        for (ExprPlan plan : inpPlans) {
-            ExprPlanVisitor epv = new ExprPlanVisitor(plan,new DependencyOrderWalker<ExpressionOperator, ExprPlan>(plan));
-            epv.visit();
+    public void visitPOForEach(POForEach nfe) throws VisitorException {
+        List<PhysicalPlan> inpPlans = nfe.getInputPlans();
+        for (PhysicalPlan plan : inpPlans) {
+            pushWalker(mCurrentWalker.spawnChildWalker(plan));
+            visit();
+            popWalker();
         }
     }
     
@@ -112,15 +132,102 @@
 	}
 
 	public void visitSort(POSort sort) throws VisitorException {
-        List<ExprPlan> inpPlans = sort.getSortPlans();
-        for (ExprPlan plan : inpPlans) {
-            ExprPlanVisitor epv = new ExprPlanVisitor(plan,new DependencyOrderWalker<ExpressionOperator, ExprPlan>(plan));
-            epv.visit();
+        List<PhysicalPlan> inpPlans = sort.getSortPlans();
+        for (PhysicalPlan plan : inpPlans) {
+            pushWalker(mCurrentWalker.spawnChildWalker(plan));
+            visit();
         }
 	}
+    
+    public void visitConstant(ConstantExpression cnst) throws VisitorException{
+        //do nothing
+    }
+    
+    public void visitProject(POProject proj) throws VisitorException{
+        //do nothing
+    }
+    
+    public void visitGreaterThan(GreaterThanExpr grt) throws VisitorException{
+        //do nothing
+    }
+    
+    public void visitLessThan(LessThanExpr lt) throws VisitorException{
+        //do nothing
+    }
+    
+    public void visitGTOrEqual(GTOrEqualToExpr gte) throws VisitorException{
+        //do nothing
+    }
+    
+    public void visiLTOrEqual(LTOrEqualToExpr lte) throws VisitorException{
+        //do nothing
+    }
+    
+    public void visitEqualTo(EqualToExpr eq) throws VisitorException{
+        //do nothing
+    }
+    
+    public void visitNotEqualTo(NotEqualToExpr eq) throws VisitorException{
+        //do nothing
+    }
+    
+    public void visitRegexp(PORegexp re) throws VisitorException{
+        //do nothing
+    }
+    
+    public void visitAdd(Add add) throws VisitorException{
+        //do nothing
+    }
+    
+    public void visitSubtract(Subtract sub) throws VisitorException {
+        //do nothing
+    }
+    
+    public void visitMultiply(Multiply mul) throws VisitorException {
+        //do nothing
+    }
+    
+    public void visitDivide(Divide dv) throws VisitorException {
+        //do nothing
+    }
+    
+    public void visitMod(Mod mod) throws VisitorException {
+        //do nothing
+    }
+    
+    public void visitAnd(POAnd and) throws VisitorException {
+        //do nothing
+    }
 
-	public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
-		// TODO Auto-generated method stub
-		return;
-	}
+    public void visitOr(POOr or) throws VisitorException {
+        //do nothing
+    }
+
+    public void visitNot(PONot not) throws VisitorException {
+        //do nothing
+    }
+
+    public void visitBinCond(POBinCond binCond) {
+        // do nothing
+        
+    }
+
+    public void visitNegative(PONegative negative) {
+        //do nothing
+        
+    }
+    
+    public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+        //do nothing
+    }
+
+    public void visitMapLookUp(POMapLookUp mapLookUp) {
+        // TODO Auto-generated method stub
+        
+    }
+
+    public void visitCast(POCast cast) {
+        // TODO Auto-generated method stub
+        
+    }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java Thu Jun 26 13:05:23 2008
@@ -33,9 +33,8 @@
  * The base class for all types of physical plans. 
  * This extends the Operator Plan.
  *
- * @param <E>
  */
-public class PhysicalPlan<E extends PhysicalOperator> extends OperatorPlan<E> {
+public class PhysicalPlan extends OperatorPlan<PhysicalOperator> {
 
     /**
      * 
@@ -47,8 +46,8 @@
     }
     
     public void attachInput(Tuple t){
-        List<E> roots = getRoots();
-        for (E operator : roots)
+        List<PhysicalOperator> roots = getRoots();
+        for (PhysicalOperator operator : roots)
             operator.attachInput(t);
     }
     
@@ -58,7 +57,7 @@
      * @param out : OutputStream to which the visual representation is written
      */
     public void explain(OutputStream out){
-        PlanPrinter<E, PhysicalPlan<E>> mpp = new PlanPrinter<E, PhysicalPlan<E>>(
+        PlanPrinter<PhysicalOperator, PhysicalPlan> mpp = new PlanPrinter<PhysicalOperator, PhysicalPlan>(
                 this);
 
         try {
@@ -73,13 +72,13 @@
     }
     
     @Override
-    public void connect(E from, E to)
+    public void connect(PhysicalOperator from, PhysicalOperator to)
             throws PlanException {
         super.connect(from, to);
         to.setInputs(getPredecessors(to));
     }
     
-    /*public void connect(List<E> from, E to) throws IOException{
+    /*public void connect(List<PhysicalOperator> from, PhysicalOperator to) throws IOException{
         if(!to.supportsMultipleInputs()){
             throw new IOException("Invalid Operation on " + to.name() + ". It doesn't support multiple inputs.");
         }
@@ -87,11 +86,11 @@
     }*/
     
     @Override
-    public void remove(E op) {
+    public void remove(PhysicalOperator op) {
         op.setInputs(null);
-        List<E> sucs = getSuccessors(op);
+        List<PhysicalOperator> sucs = getSuccessors(op);
         if(sucs!=null && sucs.size()!=0){
-            for (E suc : sucs) {
+            for (PhysicalOperator suc : sucs) {
                 suc.setInputs(null);
             }
         }



Mime
View raw message