pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dvrya...@apache.org
Subject svn commit: r1369230 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/ src/org/apache/pig/pen/ src/org/apache/pig/tools/pigstats/
Date Fri, 03 Aug 2012 22:09:12 GMT
Author: dvryaboy
Date: Fri Aug  3 22:09:11 2012
New Revision: 1369230

URL: http://svn.apache.org/viewvc?rev=1369230&view=rev
Log:
PIG-2858: Improve PlanHelper to allow finding any PhysicalOperator in a plan

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
    pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java
    pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1369230&r1=1369229&r2=1369230&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Aug  3 22:09:11 2012
@@ -24,7 +24,9 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
-PIG-2854 AvroStorage doesn't work with Avro 1.7.1 (cheolsoo via sms)
+PIG-2858: Improve PlanHelper to allow finding any PhysicalOperator in a plan (dvryaboy)
+
+PIG-2854: AvroStorage doesn't work with Avro 1.7.1 (cheolsoo via sms)
 
 PIG-2779: Refactoring the code for setting number of reducers (jay23jack via billgraham)
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1369230&r1=1369229&r2=1369230&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Fri Aug  3 22:09:11 2012
@@ -395,7 +395,7 @@ public class JobControlCompiler{
         try{
 
             //Process the POLoads
-            List<POLoad> lds = PlanHelper.getLoads(mro.mapPlan);
+            List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
 
             if(lds!=null && lds.size()>0){
                 for (POLoad ld : lds) {
@@ -491,8 +491,8 @@ public class JobControlCompiler{
             nwJob.setInputFormatClass(PigInputFormat.class);
 
             //Process POStore and remove it from the plan
-            LinkedList<POStore> mapStores = PlanHelper.getStores(mro.mapPlan);
-            LinkedList<POStore> reduceStores = PlanHelper.getStores(mro.reducePlan);
+            LinkedList<POStore> mapStores = PlanHelper.getPhysicalOperators(mro.mapPlan,
POStore.class);
+            LinkedList<POStore> reduceStores = PlanHelper.getPhysicalOperators(mro.reducePlan,
POStore.class);
 
             for (POStore st: mapStores) {
                 storeLocations.add(st);
@@ -756,7 +756,7 @@ public class JobControlCompiler{
             throw new JobCreationException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
     /**
      * Adjust the number of reducers based on the default_parallel, requested parallel and
estimated
      * parallel. For sampler jobs, we also adjust the next job in advance to get its runtime
parallel as
@@ -779,14 +779,14 @@ public class JobControlCompiler{
             // Here we use the same conf and Job to calculate the runtime #reducers of the
next job
             // which is fine as the statistics comes from the nextMro's POLoads
             int nPartitions = calculateRuntimeReducers(nextMro, conf, nwJob);
-            
+
             // set the runtime #reducer of the next job as the #partition
             ParallelConstantVisitor visitor =
               new ParallelConstantVisitor(mro.reducePlan, nPartitions);
             visitor.visit();
         }
         log.info("Setting Parallelism to " + jobParallelism);
-        
+
         // set various parallelism into the job conf for later analysis, PIG-2779
         conf.setInt("pig.info.reducers.default.parallel", pigContext.defaultParallel);
         conf.setInt("pig.info.reducers.requested.parallel", mro.requestedParallelism);
@@ -794,16 +794,16 @@ public class JobControlCompiler{
 
         // this is for backward compatibility, and we encourage to use runtimeParallelism
at runtime
         mro.requestedParallelism = jobParallelism;
-        
+
         // finally set the number of reducers
         conf.setInt("mapred.reduce.tasks", jobParallelism);
     }
-    
+
     /**
      * Calculate the runtime #reducers based on the default_parallel, requested parallel
and estimated
-     * parallel, and save it to MapReduceOper's runtimeParallelism. 
+     * parallel, and save it to MapReduceOper's runtimeParallelism.
      * @return the runtimeParallelism
-     * @throws IOException 
+     * @throws IOException
      */
     private int calculateRuntimeReducers(MapReduceOper mro, Configuration conf,
             org.apache.hadoop.mapreduce.Job nwJob) throws IOException{
@@ -811,14 +811,10 @@ public class JobControlCompiler{
         if (mro.runtimeParallelism != -1) {
             return mro.runtimeParallelism;
         }
-        List<PhysicalOperator> loads = mro.mapPlan.getRoots();
-        List<POLoad> lds = new ArrayList<POLoad>();
-        for (PhysicalOperator ld : loads) {
-            lds.add((POLoad)ld);
-        }
-        
+        List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
+
         int jobParallelism = -1;
-        
+
         if (mro.requestedParallelism > 0) {
             jobParallelism = mro.requestedParallelism;
         } else if (pigContext.defaultParallel > 0) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1369230&r1=1369229&r2=1369230&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Fri Aug  3 22:09:11 2012
@@ -323,8 +323,8 @@ public class MRCompiler extends PhyPlanV
 
         // get all stores and nativeMR operators, sort them in order(operator id)
         // and compile their plans
-        List<POStore> stores = PlanHelper.getStores(plan);
-        List<PONative> nativeMRs= PlanHelper.getNativeMRs(plan);
+        List<POStore> stores = PlanHelper.getPhysicalOperators(plan, POStore.class);
+        List<PONative> nativeMRs= PlanHelper.getPhysicalOperators(plan, PONative.class);
         List<PhysicalOperator> ops;
         if (!pigContext.inIllustrator) {
             ops = new ArrayList<PhysicalOperator>(stores.size() + nativeMRs.size());

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1369230&r1=1369229&r2=1369230&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
Fri Aug  3 22:09:11 2012
@@ -178,7 +178,7 @@ public abstract class PigGenericMapBase 
         if (mp == null)
             mp = (PhysicalPlan) ObjectSerializer.deserialize(
                 job.get("pig.mapPlan"));
-        stores = PlanHelper.getStores(mp);
+        stores = PlanHelper.getPhysicalOperators(mp, POStore.class);
         
         // To be removed
         if(mp.isEmpty())

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1369230&r1=1369229&r2=1369230&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
Fri Aug  3 22:09:11 2012
@@ -319,7 +319,7 @@ public class PigGenericMapReduce {
                 if (rp == null)
                     rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
                             .get("pig.reducePlan"));
-                stores = PlanHelper.getStores(rp);
+                stores = PlanHelper.getPhysicalOperators(rp, POStore.class);
 
                 if (!inIllustrator)
                     pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.reduce.package"));

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1369230&r1=1369229&r2=1369230&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
Fri Aug  3 22:09:11 2012
@@ -17,23 +17,70 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.util;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.net.URI;
 import java.util.LinkedList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import org.apache.hadoop.fs.Path;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Add;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Divide;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Mod;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Multiply;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.NotEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POIsNull;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POMapLookUp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONegative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONot;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POOr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORegexp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Subtract;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import java.net.URI;
+import org.python.google.common.collect.Lists;
 
 /**
  * Utility class with a few helper functions to deal with physical plans.
@@ -41,44 +88,8 @@ import java.net.URI;
 public class PlanHelper {
 
     private final static Log log = LogFactory.getLog(new PlanHelper().getClass());
-    
-    private PlanHelper() {}
 
-    /**
-     * Get all the store operators in the plan in the right dependency order
-     * @param plan
-     * @return List of stores (could be empty)
-     */
-    public static LinkedList<POStore> getStores(PhysicalPlan plan) throws VisitorException
{
-        LoadStoreNativeFinder finder = new LoadStoreNativeFinder(plan);
-
-        finder.visit();
-        return finder.getStores();
-    }
-
-    /**
-     * Get all the load operators in the plan in the right dependency order
-     * @param plan
-     * @return List of loads (could be empty)
-     */
-    public static LinkedList<POLoad> getLoads(PhysicalPlan plan) throws VisitorException
{
-        LoadStoreNativeFinder finder = new LoadStoreNativeFinder(plan);
-
-        finder.visit();
-        return finder.getLoads();
-    }
-    
-    /**
-     * Get all the load operators in the plan in the right dependency order
-     * @param plan
-     * @return List of loads (could be empty)
-     */
-    public static LinkedList<PONative> getNativeMRs(PhysicalPlan plan) throws VisitorException
{
-        LoadStoreNativeFinder finder = new LoadStoreNativeFinder(plan);
-
-        finder.visit();
-        return finder.getNativeMRs();
-    }    
+    private PlanHelper() {}
 
     /**
      * Creates a relative path that can be used to build a temporary
@@ -97,51 +108,378 @@ public class PlanHelper {
         }
     }
 
-    private static class LoadStoreNativeFinder extends PhyPlanVisitor {
-        private LinkedList<POLoad> loads;
-        private LinkedList<POStore> stores;
-        private LinkedList<PONative> nativeMRs;
-        
-        LoadStoreNativeFinder(PhysicalPlan plan) {
+    public static <C extends PhysicalOperator> boolean containsPhysicalOperator(PhysicalPlan
plan,
+            Class<C> opClass) throws VisitorException {
+        OpFinder<C> finder = new OpFinder<C>(plan, opClass);
+        finder.visit();
+        return finder.planContainsOp();
+    }
+
+    /**
+     * Returns a LinkedList of operators contained within the physical plan which implement
the supplied class, in dependency order.
+     * Returns an empty LinkedList of no such operators exist.
+     * @param plan
+     * @param opClass
+     * @return a LinkedList of operators contained within the plan which implement the supplied
class; empty if no such ops exist.
+     * @throws VisitorException
+     */
+    public static <C extends PhysicalOperator> LinkedList<C> getPhysicalOperators(PhysicalPlan
plan,
+            Class<C> opClass) throws VisitorException {
+        OpFinder<C> finder = new OpFinder<C>(plan, opClass);
+        finder.visit();
+        return finder.getFoundOps();
+    }
+
+    private static class OpFinder<C extends PhysicalOperator> extends PhyPlanVisitor
{
+
+        final Class<C> opClass;
+        private LinkedList<C> foundOps = Lists.newLinkedList();
+
+        public OpFinder(PhysicalPlan plan,
+                Class<C> opClass) {
             super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
-            stores = new LinkedList<POStore>();
-            loads = new LinkedList<POLoad>();
-            nativeMRs = new LinkedList<PONative>();
+            this.opClass = opClass;
+        }
+
+        public LinkedList<C> getFoundOps() {
+            return foundOps;
         }
-        
+
+        public boolean planContainsOp() {
+            return !foundOps.isEmpty();
+        }
+
+        @SuppressWarnings("unchecked")
+        private void visit(PhysicalOperator op) {
+            if (opClass.isAssignableFrom(op.getClass())) {
+                foundOps.add((C) op);
+            }
+        }
+
         @Override
-        public void visit() throws VisitorException {
-            super.visit();
+        public void visitLoad(POLoad ld) throws VisitorException {
+            super.visitLoad(ld);
+            visit(ld);
         }
-        
-        @Override 
+
+        @Override
         public void visitStore(POStore st) throws VisitorException {
             super.visitStore(st);
-            stores.add(st);
+            visit(st);
+        }
+
+        @Override
+        public void visitNative(PONative nat) throws VisitorException {
+            super.visitNative(nat);
+            visit(nat);
+        }
+
+        @Override
+        public void visitFilter(POFilter fl) throws VisitorException {
+            super.visitFilter(fl);
+            visit(fl);
+        }
+
+        @Override
+        public void visitCollectedGroup(POCollectedGroup mg)
+                throws VisitorException {
+            super.visitCollectedGroup(mg);
+            visit(mg);
+        }
+
+        @Override
+        public void visitLocalRearrange(POLocalRearrange lr)
+                throws VisitorException {
+            super.visitLocalRearrange(lr);
+            visit(lr);
+        }
+
+        @Override
+        public void visitGlobalRearrange(POGlobalRearrange gr)
+                throws VisitorException {
+            super.visitGlobalRearrange(gr);
+            visit(gr);
+        }
+
+        @Override
+        public void visitPackage(POPackage pkg) throws VisitorException {
+            super.visitPackage(pkg);
+            visit(pkg);
+        }
+
+        @Override
+        public void visitCombinerPackage(POCombinerPackage pkg)
+                throws VisitorException {
+            super.visitCombinerPackage(pkg);
+            visit(pkg);
+        }
+
+        @Override
+        public void visitMultiQueryPackage(POMultiQueryPackage pkg)
+                throws VisitorException {
+            super.visitMultiQueryPackage(pkg);
+            visit(pkg);
+        }
+
+        @Override
+        public void visitPOForEach(POForEach nfe) throws VisitorException {
+            super.visitPOForEach(nfe);
+            visit(nfe);
+        }
+
+        @Override
+        public void visitUnion(POUnion un) throws VisitorException {
+            super.visitUnion(un);
+            visit(un);
+        }
+
+        @Override
+        public void visitSplit(POSplit spl) throws VisitorException {
+            super.visitSplit(spl);
+            visit(spl);
+        }
+
+        @Override
+        public void visitDemux(PODemux demux) throws VisitorException {
+            super.visitDemux(demux);
+            visit(demux);
+        }
+
+        @Override
+        public void visitDistinct(PODistinct distinct) throws VisitorException {
+            super.visitDistinct(distinct);
+            visit(distinct);
+        }
+
+        @Override
+        public void visitSort(POSort sort) throws VisitorException {
+            super.visitSort(sort);
+            visit(sort);
+        }
+
+        @Override
+        public void visitConstant(ConstantExpression cnst)
+                throws VisitorException {
+            super.visitConstant(cnst);
+            visit(cnst);
+        }
+
+        @Override
+        public void visitProject(POProject proj) throws VisitorException {
+            super.visitProject(proj);
+            visit(proj);
         }
 
-        @Override 
-        public void visitLoad(POLoad load) throws VisitorException {
-            super.visitLoad(load);
-            loads.add(load);
+        @Override
+        public void visitGreaterThan(GreaterThanExpr grt)
+                throws VisitorException {
+            super.visitGreaterThan(grt);
+            visit(grt);
         }
-        
-        @Override 
-        public void visitNative(PONative nativeMR) throws VisitorException {
-            super.visitNative(nativeMR);
-            nativeMRs.add(nativeMR);
+
+        @Override
+        public void visitLessThan(LessThanExpr lt) throws VisitorException {
+            super.visitLessThan(lt);
+            visit(lt);
         }
-        
-        public LinkedList<POStore> getStores() {
-            return stores;
+
+        @Override
+        public void visitGTOrEqual(GTOrEqualToExpr gte) throws VisitorException {
+            super.visitGTOrEqual(gte);
+            visit(gte);
         }
 
-        public LinkedList<POLoad> getLoads() {
-            return loads;
+        @Override
+        public void visitLTOrEqual(LTOrEqualToExpr lte) throws VisitorException {
+            super.visitLTOrEqual(lte);
+            visit(lte);
         }
-        
-        public LinkedList<PONative> getNativeMRs(){
-            return nativeMRs;
+
+        @Override
+        public void visitEqualTo(EqualToExpr eq) throws VisitorException {
+            super.visitEqualTo(eq);
+            visit(eq);
+        }
+
+        @Override
+        public void visitNotEqualTo(NotEqualToExpr eq) throws VisitorException {
+            super.visitNotEqualTo(eq);
+            visit(eq);
+        }
+
+        @Override
+        public void visitRegexp(PORegexp re) throws VisitorException {
+            super.visitRegexp(re);
+            visit(re);
+        }
+
+        @Override
+        public void visitIsNull(POIsNull isNull) throws VisitorException {
+            super.visitIsNull(isNull);
+            visit(isNull);
+        }
+
+        @Override
+        public void visitAdd(Add add) throws VisitorException {
+            super.visitAdd(add);
+            visit(add);
+        }
+
+        @Override
+        public void visitSubtract(Subtract sub) throws VisitorException {
+            super.visitSubtract(sub);
+            visit(sub);
+        }
+
+        @Override
+        public void visitMultiply(Multiply mul) throws VisitorException {
+            super.visitMultiply(mul);
+            visit(mul);
+        }
+
+        @Override
+        public void visitDivide(Divide dv) throws VisitorException {
+            super.visitDivide(dv);
+            visit(dv);
+        }
+
+        @Override
+        public void visitMod(Mod mod) throws VisitorException {
+            super.visitMod(mod);
+            visit(mod);
+        }
+
+        @Override
+        public void visitAnd(POAnd and) throws VisitorException {
+            super.visitAnd(and);
+            visit(and);
+        }
+
+        @Override
+        public void visitOr(POOr or) throws VisitorException {
+            super.visitOr(or);
+            visit(or);
+        }
+
+        @Override
+        public void visitNot(PONot not) throws VisitorException {
+            super.visitNot(not);
+            visit(not);
+        }
+
+        @Override
+        public void visitBinCond(POBinCond binCond) {
+            super.visitBinCond(binCond);
+            visit(binCond);
+        }
+
+        @Override
+        public void visitNegative(PONegative negative) {
+            super.visitNegative(negative);
+            visit(negative);
+        }
+
+        @Override
+        public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+            super.visitUserFunc(userFunc);
+            visit(userFunc);
+        }
+
+        @Override
+        public void visitComparisonFunc(POUserComparisonFunc compFunc)
+                throws VisitorException {
+            super.visitComparisonFunc(compFunc);
+            visit(compFunc);
+    }
+
+        @Override
+        public void visitMapLookUp(POMapLookUp mapLookUp) {
+            super.visitMapLookUp(mapLookUp);
+            visit(mapLookUp);
+        }
+
+        @Override
+        public void visitJoinPackage(POJoinPackage joinPackage)
+                throws VisitorException {
+            super.visitJoinPackage(joinPackage);
+            visit(joinPackage);
+        }
+
+        @Override
+        public void visitCast(POCast cast) {
+            super.visitCast(cast);
+            visit(cast);
+        }
+
+        @Override
+        public void visitLimit(POLimit lim) throws VisitorException {
+            super.visitLimit(lim);
+            visit(lim);
+        }
+
+        @Override
+        public void visitCross(POCross cross) throws VisitorException {
+            super.visitCross(cross);
+            visit(cross);
+        }
+
+        @Override
+        public void visitFRJoin(POFRJoin join) throws VisitorException {
+            super.visitFRJoin(join);
+            visit(join);
+        }
+
+        @Override
+        public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+            super.visitMergeJoin(join);
+            visit(join);
+        }
+
+        @Override
+        public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
+                throws VisitorException {
+            super.visitMergeCoGroup(mergeCoGrp);
+            visit(mergeCoGrp);
+        }
+
+        @Override
+        public void visitStream(POStream stream) throws VisitorException {
+            super.visitStream(stream);
+            visit(stream);
+        }
+
+        @Override
+        public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
+            super.visitSkewedJoin(sk);
+            visit(sk);
+        }
+
+        @Override
+        public void visitPartitionRearrange(POPartitionRearrange pr)
+                throws VisitorException {
+            super.visitPartitionRearrange(pr);
+            visit(pr);
+        }
+
+        @Override
+        public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach)
+                throws VisitorException {
+            super.visitPOOptimizedForEach(optimizedForEach);
+            visit(optimizedForEach);
+        }
+
+        @Override
+        public void visitPreCombinerLocalRearrange(
+                POPreCombinerLocalRearrange preCombinerLocalRearrange) {
+            super.visitPreCombinerLocalRearrange(preCombinerLocalRearrange);
+            visit(preCombinerLocalRearrange);
+        }
+
+        @Override
+        public void visitPartialAgg(POPartialAgg poPartialAgg) {
+            super.visitPartialAgg(poPartialAgg);
+            visit(poPartialAgg);
         }
     }
+
 }

Modified: pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java?rev=1369230&r1=1369229&r2=1369230&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java (original)
+++ pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java Fri Aug  3 22:09:11 2012
@@ -129,15 +129,15 @@ public class LocalMapReduceSimulator {
                     pack = mro.reducePlan.getRoots().get(0);
                 }
                 
-                List<POLoad> lds = PlanHelper.getLoads(mro.mapPlan);
+                List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
                 if (!mro.mapPlan.isEmpty()) {
-                    stores = PlanHelper.getStores(mro.mapPlan);
+                    stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
                 }
                 if (!mro.reducePlan.isEmpty()) {
                     if (stores == null)
-                        stores = PlanHelper.getStores(mro.reducePlan);
+                        stores = PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class);
                     else
-                        stores.addAll(PlanHelper.getStores(mro.reducePlan));
+                        stores.addAll(PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class));
                 }
 
                 for (POStore store : stores) {

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1369230&r1=1369229&r2=1369230&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Fri Aug  3 22:09:11 2012
@@ -292,7 +292,7 @@ public class ScriptState {
         conf.set(PIG_PROPERTY.COMMAND_LINE.toString(), getCommandLine());
         
         try {
-            LinkedList<POStore> stores = PlanHelper.getStores(mro.mapPlan);
+            LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(mro.mapPlan,
POStore.class);
             ArrayList<String> outputDirs = new ArrayList<String>();
             for (POStore st: stores) {  
                 outputDirs.add(st.getSFile().getFileName()); 
@@ -303,7 +303,7 @@ public class ScriptState {
         }
         if (!mro.reducePlan.isEmpty()) {
             try {
-                LinkedList<POStore> stores = PlanHelper.getStores(mro.reducePlan);
+                LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(mro.reducePlan,
POStore.class);
                 ArrayList<String> outputDirs = new ArrayList<String>();
                 for (POStore st: stores) {  
                     outputDirs.add(st.getSFile().getFileName()); 
@@ -314,7 +314,7 @@ public class ScriptState {
             }
         }        
         try {
-            List<POLoad> lds = PlanHelper.getLoads(mro.mapPlan);
+            List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
             ArrayList<String> inputDirs = new ArrayList<String>();
             if (lds != null && lds.size() > 0){
                 for (POLoad ld : lds) {



Mime
View raw message