pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1384352 [2/4] - in /pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer...
Date Thu, 13 Sep 2012 14:55:38 GMT
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Thu Sep 13 14:55:36 2012
@@ -22,9 +22,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,6 +42,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
@@ -54,6 +57,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -94,7 +98,7 @@ import org.apache.pig.parser.SourceLocat
 
 public class LogToPhyTranslationVisitor extends LogicalRelationalNodesVisitor {
     private static final Log LOG = LogFactory.getLog(LogToPhyTranslationVisitor.class);
-    
+
     public LogToPhyTranslationVisitor(OperatorPlan plan) throws FrontendException {
         super(plan, new DependencyOrderWalker(plan));
         currentPlan = new PhysicalPlan();
@@ -111,7 +115,7 @@ public class LogToPhyTranslationVisitor 
     protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
 
     protected PigContext pc;
-        
+
     public void setPigContext(PigContext pc) {
         this.pc = pc;
     }
@@ -119,16 +123,16 @@ public class LogToPhyTranslationVisitor 
     public Map<Operator, PhysicalOperator> getLogToPhyMap() {
         return logToPhyMap;
     }
-    
+
     public PhysicalPlan getPhysicalPlan() {
         return currentPlan;
     }
-    
+
     @Override
     public void visit(LOLoad loLoad) throws FrontendException {
         String scope = DEFAULT_SCOPE;
-//        System.err.println("Entering Load");
-        // The last parameter here is set to true as we assume all files are 
+        //        System.err.println("Entering Load");
+        // The last parameter here is set to true as we assume all files are
         // splittable due to LoadStore Refactor
         POLoad load = new POLoad(new OperatorKey(scope, nodeGen
                 .getNextNodeId(scope)), loLoad.getLoadFunc());
@@ -145,7 +149,7 @@ public class LogToPhyTranslationVisitor 
         // case it might have a store as a predecessor.
         List<Operator> op = loLoad.getPlan().getPredecessors(loLoad);
         PhysicalOperator from;
-        
+
         if(op != null) {
             from = logToPhyMap.get(op.get(0));
             try {
@@ -156,13 +160,13 @@ public class LogToPhyTranslationVisitor 
                 throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
             }
         }
-//        System.err.println("Exiting Load");
+        //        System.err.println("Exiting Load");
     }
-    
+
     @Override
-    public void visit(LONative loNative) throws FrontendException{     
+    public void visit(LONative loNative) throws FrontendException{
         String scope = DEFAULT_SCOPE;
-        
+
         PONative poNative = new PONative(new OperatorKey(scope, nodeGen
                 .getNextNodeId(scope)));
         poNative.addOriginalLocation(loNative.getAlias(), loNative.getLocation());
@@ -172,7 +176,7 @@ public class LogToPhyTranslationVisitor 
 
         logToPhyMap.put(loNative, poNative);
         currentPlan.add(poNative);
-        
+
         List<Operator> op = loNative.getPlan().getPredecessors(loNative);
 
         PhysicalOperator from;
@@ -183,7 +187,7 @@ public class LogToPhyTranslationVisitor 
             String msg = "Did not find a predecessor for Native." ;
             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
         }
-        
+
         try {
             currentPlan.connect(from, poNative);
         } catch (PlanException e) {
@@ -191,13 +195,13 @@ public class LogToPhyTranslationVisitor 
             String msg = "Invalid physical operators in the physical plan" ;
             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
         }
-        
+
     }
-    
+
     @Override
     public void visit(LOFilter filter) throws FrontendException {
         String scope = DEFAULT_SCOPE;
-//        System.err.println("Entering Filter");
+        //        System.err.println("Entering Filter");
         POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
                 .getNextNodeId(scope)), filter.getRequestedParallelism());
         poFilter.addOriginalLocation(filter.getAlias(), filter.getLocation());
@@ -208,8 +212,8 @@ public class LogToPhyTranslationVisitor 
 
         currentPlan = new PhysicalPlan();
 
-//        PlanWalker childWalker = currentWalker
-//                .spawnChildWalker(filter.getFilterPlan());
+        //        PlanWalker childWalker = currentWalker
+        //                .spawnChildWalker(filter.getFilterPlan());
         PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(filter.getFilterPlan());
         pushWalker(childWalker);
         //currentWalker.walk(this);
@@ -231,7 +235,7 @@ public class LogToPhyTranslationVisitor 
             String msg = "Did not find a predecessor for Filter." ;
             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
         }
-        
+
         try {
             currentPlan.connect(from, poFilter);
         } catch (PlanException e) {
@@ -239,11 +243,11 @@ public class LogToPhyTranslationVisitor 
             String msg = "Invalid physical operators in the physical plan" ;
             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
         }
-        
+
         translateSoftLinks(filter);
-//        System.err.println("Exiting Filter");
+        //        System.err.println("Exiting Filter");
     }
-    
+
     @Override
     public void visit(LOSort sort) throws FrontendException {
         String scope = DEFAULT_SCOPE;
@@ -282,17 +286,17 @@ public class LogToPhyTranslationVisitor 
         // sort.setRequestedParallelism(s.getType());
         logToPhyMap.put(sort, poSort);
         currentPlan.add(poSort);
-        List<Operator> op = sort.getPlan().getPredecessors(sort); 
+        List<Operator> op = sort.getPlan().getPredecessors(sort);
         PhysicalOperator from;
-        
+
         if(op != null) {
             from = logToPhyMap.get(op.get(0));
         } else {
             int errCode = 2051;
             String msg = "Did not find a predecessor for Sort." ;
-            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);            
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
         }
-        
+
         try {
             currentPlan.connect(from, poSort);
         } catch (PlanException e) {
@@ -303,7 +307,254 @@ public class LogToPhyTranslationVisitor 
 
         poSort.setResultType(DataType.BAG);
     }
-    
+
+    /**
+     * Transformation from Logical to Physical Plan involves the following steps:
+     * First, it is generated a random number which will link a POCounter within a PORank.
+     * On this way, avoiding possible collisions on parallel rank operations.
+     * Then, if it is row number mode:
+     * <pre>
+     * In case of a RANK operation (row number mode), are used two steps:
+     *   1.- Each tuple is counted sequentially on each mapper, and are produced global counters
+     *   2.- Global counters are gathered and summed, each tuple calls to the respective counter value
+     *       in order to calculate the corresponding rank value.
+     * </pre>
+     * or not:
+     * <pre>
+     * In case of a RANK BY operation, then are necessary five steps:
+     *   1.- Group by the fields involved on the rank operation: POPackage
+     *   2.- In case of multi-fields, the key (group field) is flatten: POForEach
+     *   3.- Sort operation by the fields available after flattening: POSort
+     *   4.- Each group is sequentially counted on each mapper through a global counter: POCounter
+     *   5.- Global counters are summed and passed to the rank operation: PORank
+     * </pre>
+     * @param loRank describe if the rank operation is on a row number mode
+     * or is rank by (dense or not)
+     **/
+    @Override
+    public void visit(LORank loRank) throws FrontendException {
+        String scope = DEFAULT_SCOPE;
+        PORank poRank;
+        POCounter poCounter;
+
+        Random randomGenerator = new Random();
+        Long operationID = Math.abs(randomGenerator.nextLong());
+
+        try {
+            // Physical operations for RANK operator:
+            // In case of a RANK BY operation, then are necessary five steps:
+            //   1.- Group by the fields involved on the rank operation: POPackage
+            //   2.- In case of multi-fields, the key (group field) is flatten: POForEach
+            //   3.- Sort operation by the fields available after flattening: POSort
+            //   4.- Each group is sequentially counted on each mapper through a global counter: POCounter
+            //   5.- Global counters are summed and passed to the rank operation: PORank
+            if(!loRank.isRowNumber()) {
+
+                boolean[] flags = {false};
+
+                MultiMap<Integer, LogicalExpressionPlan> expressionPlans = new MultiMap<Integer, LogicalExpressionPlan>();
+                for(int i = 0 ; i < loRank.getRankColPlans().size() ; i++)
+                    expressionPlans.put(i,loRank.getRankColPlans());
+
+                POPackage poPackage = compileToLR_GR_PackTrio(loRank, null, flags, expressionPlans);
+                poPackage.setPackageType(PackageType.GROUP);
+                translateSoftLinks(loRank);
+
+                List<Boolean> flattenLst = Arrays.asList(true, false);
+
+                PhysicalPlan fep1 = new PhysicalPlan();
+                POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
+                feproj1.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+                feproj1.setColumn(0);
+                feproj1.setResultType(poPackage.getKeyType());
+                feproj1.setStar(false);
+                feproj1.setOverloaded(false);
+                fep1.add(feproj1);
+
+
+                PhysicalPlan fep2 = new PhysicalPlan();
+                POProject feproj2 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
+                feproj2.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+                feproj2.setColumn(1);
+                feproj2.setResultType(DataType.BAG);
+                feproj2.setStar(false);
+                feproj2.setOverloaded(false);
+                fep2.add(feproj2);
+                List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
+
+                POForEach poForEach = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1, fePlans, flattenLst);
+
+                List<LogicalExpressionPlan> rankPlans = loRank.getRankColPlans();
+                byte[] newTypes = new byte[rankPlans.size()];
+
+                for(int i = 0; i < rankPlans.size(); i++) {
+                    LogicalExpressionPlan loep = rankPlans.get(i);
+                    Iterator<Operator> inpOpers = loep.getOperators();
+
+                    while(inpOpers.hasNext()) {
+                        Operator oper = inpOpers.next();
+                        newTypes[i] = ((ProjectExpression) oper).getType();
+                    }
+                }
+
+                List<PhysicalPlan> newPhysicalPlan = new ArrayList<PhysicalPlan>();
+                List<Boolean> newOrderPlan = new ArrayList<Boolean>();
+
+                for(int i = 0; i < loRank.getRankColPlans().size(); i++) {
+                    PhysicalPlan fep3 = new PhysicalPlan();
+                    POProject feproj3 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
+                    feproj3.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+                    feproj3.setColumn(i);
+                    feproj3.setResultType(newTypes[i]);
+                    feproj3.setStar(false);
+                    feproj3.setOverloaded(false);
+                    fep3.add(feproj3);
+
+                    newPhysicalPlan.add(fep3);
+                    newOrderPlan.add(loRank.getAscendingCol().get(i));
+                }
+
+                POSort poSort;
+                poSort = new POSort(new OperatorKey(scope, nodeGen
+                        .getNextNodeId(scope)), -1, null,
+                        newPhysicalPlan, newOrderPlan, null);
+                poSort.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+
+
+                poCounter = new POCounter(
+                        new OperatorKey(scope, nodeGen
+                                .getNextNodeId(scope)), -1 , null,
+                                newPhysicalPlan, newOrderPlan);
+
+                poCounter.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+                poCounter.setResultType(DataType.TUPLE);
+                poCounter.setIsRowNumber(loRank.isRowNumber());
+                poCounter.setIsDenseRank(loRank.isDenseRank());
+                poCounter.setOperationID(String.valueOf(operationID));
+
+                poRank = new PORank(
+                        new OperatorKey(scope, nodeGen
+                                .getNextNodeId(scope)), -1 , null,
+                                newPhysicalPlan, newOrderPlan);
+
+                poRank.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+                poRank.setResultType(DataType.TUPLE);
+                poRank.setOperationID(String.valueOf(operationID));
+
+                List<Boolean> flattenLst2 = Arrays.asList(false, true);
+
+                PhysicalPlan fep12 = new PhysicalPlan();
+                POProject feproj12 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
+                feproj12.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+                feproj12.setColumn(0);
+                feproj12.setResultType(DataType.LONG);
+                feproj12.setStar(false);
+                feproj12.setOverloaded(false);
+                fep12.add(feproj12);
+
+
+                PhysicalPlan fep22 = new PhysicalPlan();
+                POProject feproj22 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1);
+                feproj22.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+                feproj22.setColumn(loRank.getRankColPlans().size()+1);
+                feproj22.setResultType(DataType.BAG);
+                feproj22.setStar(false);
+                feproj22.setOverloaded(false);
+                fep22.add(feproj22);
+                List<PhysicalPlan> fePlans2 = Arrays.asList(fep12, fep22);
+
+                POForEach poForEach2 = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), -1, fePlans2, flattenLst2);
+
+                currentPlan.add(poForEach);
+                currentPlan.add(poSort);
+                currentPlan.add(poCounter);
+                currentPlan.add(poRank);
+                currentPlan.add(poForEach2);
+
+                try {
+                    currentPlan.connect(poPackage, poForEach);
+                    currentPlan.connect(poForEach, poSort);
+                    currentPlan.connect(poSort, poCounter);
+                    currentPlan.connect(poCounter, poRank);
+                    currentPlan.connect(poRank, poForEach2);
+                } catch (PlanException e) {
+                    throw new LogicalToPhysicalTranslatorException(e.getMessage(),e.getErrorCode(),e.getErrorSource(),e);
+                }
+
+                logToPhyMap.put(loRank, poForEach2);
+
+                // In case of a RANK operation, are used two steps:
+                //   1.- Each tuple is counted sequentially on each mapper, and are produced global counters
+                //   2.- Global counters are gathered and summed, each tuple calls to the respective counter value
+                //       in order to calculate the corresponding rank value.
+            } else {
+
+                List<LogicalExpressionPlan> logPlans = loRank.getRankColPlans();
+                List<PhysicalPlan> rankPlans = new ArrayList<PhysicalPlan>(logPlans.size());
+
+                // convert all the logical expression plans to physical expression plans
+                currentPlans.push(currentPlan);
+                for (LogicalExpressionPlan plan : logPlans) {
+                    currentPlan = new PhysicalPlan();
+                    PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(plan);
+                    pushWalker(childWalker);
+                    childWalker.walk(new ExpToPhyTranslationVisitor( currentWalker.getPlan(),
+                            childWalker, loRank, currentPlan, logToPhyMap));
+                    rankPlans.add(currentPlan);
+                    popWalker();
+                }
+                currentPlan = currentPlans.pop();
+
+
+
+                poCounter = new POCounter(
+                        new OperatorKey(scope, nodeGen
+                                .getNextNodeId(scope)), -1 , null,
+                                rankPlans, loRank.getAscendingCol());
+
+                poCounter.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+                poCounter.setResultType(DataType.TUPLE);
+                poCounter.setIsRowNumber(loRank.isRowNumber());
+                poCounter.setIsDenseRank(loRank.isDenseRank());
+                poCounter.setOperationID(String.valueOf(operationID));
+
+                poRank = new PORank(
+                        new OperatorKey(scope, nodeGen
+                                .getNextNodeId(scope)), -1 , null,
+                                rankPlans, loRank.getAscendingCol());
+
+                poRank.addOriginalLocation(loRank.getAlias(), loRank.getLocation());
+                poRank.setResultType(DataType.TUPLE);
+                poRank.setOperationID(String.valueOf(operationID));
+
+                currentPlan.add(poCounter);
+                currentPlan.add(poRank);
+
+                List<Operator> op = loRank.getPlan().getPredecessors(loRank);
+                PhysicalOperator from;
+
+                if(op != null) {
+                    from = logToPhyMap.get(op.get(0));
+                } else {
+                    int errCode = 2051;
+                    String msg = "Did not find a predecessor for Rank." ;
+                    throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
+                }
+
+                currentPlan.connect(from, poCounter);
+                currentPlan.connect(poCounter, poRank);
+
+                logToPhyMap.put(loRank, poRank);
+            }
+
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+
+    }
+
     @Override
     public void visit(LOCross cross) throws FrontendException {
         String scope = DEFAULT_SCOPE;
@@ -334,20 +585,20 @@ public class LogToPhyTranslationVisitor 
             poGlobal.addOriginalLocation(cross.getAlias(), cross.getLocation());
             currentPlan.add(poGlobal);
             currentPlan.add(poPackage);
-            
+
             int count = 0;
-            
+
             try {
                 currentPlan.connect(poGlobal, poPackage);
                 List<Boolean> flattenLst = Arrays.asList(true, true);
-                
+
                 for (Operator op : inputs) {
                     PhysicalPlan fep1 = new PhysicalPlan();
                     ConstantExpression ce1 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism());
                     ce1.setValue(inputs.size());
                     ce1.setResultType(DataType.INTEGER);
                     fep1.add(ce1);
-                    
+
                     ConstantExpression ce2 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism());
                     ce2.setValue(count);
                     ce2.setResultType(DataType.INTEGER);
@@ -357,7 +608,7 @@ public class LogToPhyTranslationVisitor 
                     ce1val.set(1,count);
                     ce1.setValue(ce1val);
                     ce1.setResultType(DataType.TUPLE);*/
-                    
+
                     POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelism(), Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2), new FuncSpec(GFCross.class.getName()));
                     gfc.addOriginalLocation(cross.getAlias(), cross.getLocation());
                     gfc.setResultType(DataType.BAG);
@@ -366,7 +617,7 @@ public class LogToPhyTranslationVisitor 
                     /*fep1.add(gfc);
                     fep1.connect(ce1, gfc);
                     fep1.connect(ce2, gfc);*/
-                    
+
                     PhysicalPlan fep2 = new PhysicalPlan();
                     POProject feproj = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism());
                     feproj.addOriginalLocation(cross.getAlias(), cross.getLocation());
@@ -375,12 +626,12 @@ public class LogToPhyTranslationVisitor 
                     feproj.setOverloaded(false);
                     fep2.add(feproj);
                     List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
-                    
+
                     POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
                     fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
                     currentPlan.add(fe);
                     currentPlan.connect(logToPhyMap.get(op), fe);
-                    
+
                     POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
                             scope, nodeGen.getNextNodeId(scope)), cross
                             .getRequestedParallelism());
@@ -395,13 +646,13 @@ public class LogToPhyTranslationVisitor 
                         lrp1.add(lrproj1);
                         lrPlans.add(lrp1);
                     }
-                    
+
                     physOp.setCross(true);
                     physOp.setIndex(count++);
                     physOp.setKeyType(DataType.TUPLE);
                     physOp.setPlans(lrPlans);
                     physOp.setResultType(DataType.TUPLE);
-                    
+
                     currentPlan.add(physOp);
                     currentPlan.connect(fe, physOp);
                     currentPlan.connect(physOp, poGlobal);
@@ -415,7 +666,7 @@ public class LogToPhyTranslationVisitor 
                 String msg = "Unable to set index on newly create POLocalRearrange.";
                 throw new VisitorException(msg, errCode, PigException.BUG, e);
             }
-            
+
             poPackage.setKeyType(DataType.TUPLE);
             poPackage.setResultType(DataType.TUPLE);
             poPackage.setNumInps(count);
@@ -424,7 +675,7 @@ public class LogToPhyTranslationVisitor 
                 inner[i] = true;
             }
             poPackage.setInner(inner);
-            
+
             List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
             List<Boolean> flattenLst = new ArrayList<Boolean>();
             for(int i=1;i<=count;i++){
@@ -437,7 +688,7 @@ public class LogToPhyTranslationVisitor 
                 fePlans.add(fep1);
                 flattenLst.add(true);
             }
-            
+
             POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
             fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
             currentPlan.add(fe);
@@ -451,28 +702,28 @@ public class LogToPhyTranslationVisitor 
             logToPhyMap.put(cross, fe);
         }
     }
-    
+
     @Override
     public void visit(LOStream stream) throws FrontendException {
         String scope = DEFAULT_SCOPE;
         POStream poStream = new POStream(new OperatorKey(scope, nodeGen
-                .getNextNodeId(scope)), stream.getExecutableManager(), 
+                .getNextNodeId(scope)), stream.getExecutableManager(),
                 stream.getStreamingCommand(), this.pc.getProperties());
         poStream.addOriginalLocation(stream.getAlias(), stream.getLocation());
         currentPlan.add(poStream);
         logToPhyMap.put(stream, poStream);
-        
+
         List<Operator> op = stream.getPlan().getPredecessors(stream);
 
         PhysicalOperator from;
         if(op != null) {
             from = logToPhyMap.get(op.get(0));
-        } else {                
+        } else {
             int errCode = 2051;
             String msg = "Did not find a predecessor for Stream." ;
             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
         }
-        
+
         try {
             currentPlan.connect(from, poStream);
         } catch (PlanException e) {
@@ -485,10 +736,10 @@ public class LogToPhyTranslationVisitor 
     @Override
     public void visit(LOInnerLoad load) throws FrontendException {
         String scope = DEFAULT_SCOPE;
-        
+
         POProject exprOp = new POProject(new OperatorKey(scope, nodeGen
               .getNextNodeId(scope)));
-        
+
         LogicalSchema s = load.getSchema();
 
         if (load.sourceIsBag()) {
@@ -518,25 +769,25 @@ public class LogToPhyTranslationVisitor 
             exprOp.setColumn(load.getColNum());
         }
         // set input to POProject to the predecessor of foreach
-        
+
         logToPhyMap.put(load, exprOp);
         currentPlan.add(exprOp);
     }
-    
+
     @Override
     public void visit(LOForEach foreach) throws FrontendException {
         String scope = DEFAULT_SCOPE;
-        
+
         List<PhysicalPlan> innerPlans = new ArrayList<PhysicalPlan>();
-        
+
         org.apache.pig.newplan.logical.relational.LogicalPlan inner = foreach.getInnerPlan();
         LOGenerate gen = (LOGenerate)inner.getSinks().get(0);
-       
+
         List<LogicalExpressionPlan> exps = gen.getOutputPlans();
         List<Operator> preds = inner.getPredecessors(gen);
 
         currentPlans.push(currentPlan);
-        
+
         // we need to translate each predecessor of LOGenerate into a physical plan.
         // The physical plan should contain the expression plan for this predecessor plus
         // the subtree starting with this predecessor
@@ -546,25 +797,25 @@ public class LogToPhyTranslationVisitor 
             PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(exps.get(i));
             pushWalker(childWalker);
             childWalker.walk(new ExpToPhyTranslationVisitor(exps.get(i),
-                    childWalker, gen, currentPlan, logToPhyMap));            
+                    childWalker, gen, currentPlan, logToPhyMap));
             popWalker();
-            
+
             List<Operator> leaves = exps.get(i).getSinks();
             for(Operator l: leaves) {
                 PhysicalOperator op = logToPhyMap.get(l);
                 if (l instanceof ProjectExpression ) {
-                    int input = ((ProjectExpression)l).getInputNum();                    
-                    
+                    int input = ((ProjectExpression)l).getInputNum();
+
                     // for each sink projection, get its input logical plan and translate it
                     Operator pred = preds.get(input);
                     childWalker = new SubtreeDependencyOrderWalker(inner, pred);
                     pushWalker(childWalker);
                     childWalker.walk(this);
                     popWalker();
-                    
+
                     // get the physical operator of the leaf of input logical plan
-                    PhysicalOperator leaf = logToPhyMap.get(pred);                    
-                    
+                    PhysicalOperator leaf = logToPhyMap.get(pred);
+
                     if (pred instanceof LOInnerLoad) {
                         // if predecessor is only an LOInnerLoad, remove the project that
                         // comes from LOInnerLoad and change the column of project that
@@ -582,19 +833,19 @@ public class LogToPhyTranslationVisitor 
                             }else {
                                 ((POProject)op).setColumn(leafProj.getColumn() );
                             }
-                            
+
                         } catch (ExecException e) {
                             throw new FrontendException(foreach, "Cannot get column from "+leaf, 2230, e);
                         }
 
-                    }else{                    
+                    }else{
                         currentPlan.connect(leaf, op);
                     }
                 }
             }
             innerPlans.add(currentPlan);
         }
-        
+
         currentPlan = currentPlans.pop();
 
         // PhysicalOperator poGen = new POGenerate(new OperatorKey("",
@@ -629,14 +880,13 @@ public class LogToPhyTranslationVisitor 
 
         translateSoftLinks(foreach);
     }
-    
+
     /**
-     * This function takes in a List of LogicalExpressionPlan and converts them to 
+     * This function takes in a List of LogicalExpressionPlan and converts them to
      * a list of PhysicalPlans
-     * 
      * @param plans
      * @return
-     * @throws FrontendException 
+     * @throws FrontendException
      */
     private List<PhysicalPlan> translateExpressionPlans(LogicalRelationalOperator loj,
             List<LogicalExpressionPlan> plans ) throws FrontendException {
@@ -644,40 +894,40 @@ public class LogToPhyTranslationVisitor 
         if( plans == null || plans.size() == 0 ) {
             return exprPlans;
         }
-        
+
         // Save the current plan onto stack
         currentPlans.push(currentPlan);
-        
+
         for( LogicalExpressionPlan lp : plans ) {
             currentPlan = new PhysicalPlan();
-            
-            // We spawn a new Dependency Walker and use it 
+
+            // We spawn a new Dependency Walker and use it
             // PlanWalker childWalker = currentWalker.spawnChildWalker(lp);
             PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(lp);
-            
+
             // Save the old walker and use childWalker as current Walker
             pushWalker(childWalker);
-            
+
             // We create a new ExpToPhyTranslationVisitor to walk the ExpressionPlan
             currentWalker.walk(
-                    new ExpToPhyTranslationVisitor( 
-                            currentWalker.getPlan(), 
+                    new ExpToPhyTranslationVisitor(
+                            currentWalker.getPlan(),
                             childWalker, loj, currentPlan, logToPhyMap) );
-            
+
             exprPlans.add(currentPlan);
             popWalker();
         }
-        
+
         // Pop the current plan back out
         currentPlan = currentPlans.pop();
 
         return exprPlans;
     }
-    
+
     @Override
     public void visit(LOStore loStore) throws FrontendException {
         String scope = DEFAULT_SCOPE;
-//        System.err.println("Entering Store");
+        //        System.err.println("Entering Store");
         POStore store = new POStore(new OperatorKey(scope, nodeGen
                 .getNextNodeId(scope)));
         store.addOriginalLocation(loStore.getAlias(), loStore.getLocation());
@@ -687,17 +937,17 @@ public class LogToPhyTranslationVisitor 
         store.setSortInfo(loStore.getSortInfo());
         store.setIsTmpStore(loStore.isTmpStore());
         store.setStoreFunc(loStore.getStoreFunc());
-        
+
         store.setSchema(Util.translateSchema( loStore.getSchema() ));
 
         currentPlan.add(store);
-        
-        List<Operator> op = loStore.getPlan().getPredecessors(loStore); 
+
+        List<Operator> op = loStore.getPlan().getPredecessors(loStore);
         PhysicalOperator from = null;
-        
+
         if(op != null) {
             from = logToPhyMap.get(op.get(0));
-            // TODO Implement sorting when we have a LOSort (new) and LOLimit (new) operator ready
+//             TODO Implement sorting when we have a LOSort (new) and LOLimit (new) operator ready
 //            SortInfo sortInfo = null;
 //            // if store's predecessor is limit,
 //            // check limit's predecessor
@@ -725,9 +975,9 @@ public class LogToPhyTranslationVisitor 
             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
         }
         logToPhyMap.put(loStore, store);
-//        System.err.println("Exiting Store");
+        //        System.err.println("Exiting Store");
     }
-    
+
     @Override
     public void visit( LOCogroup cg ) throws FrontendException {
         switch (cg.getGroupType()) {
@@ -747,7 +997,7 @@ public class LogToPhyTranslationVisitor 
         }
         translateSoftLinks(cg);
     }
-    
+
     private void translateCollectedCogroup(LOCogroup cg) throws FrontendException {
         // can have only one input
         LogicalRelationalOperator pred = (LogicalRelationalOperator) plan.getPredecessors(cg).get(0);
@@ -756,7 +1006,7 @@ public class LogToPhyTranslationVisitor 
                 DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
         physOp.addOriginalLocation(cg.getAlias(), cg.getLocation());
         List<PhysicalPlan> pExprPlans = translateExpressionPlans(cg, exprPlans);
-        
+
         try {
             physOp.setPlans(pExprPlans);
         } catch (PlanException pe) {
@@ -775,7 +1025,7 @@ public class LogToPhyTranslationVisitor 
         physOp.setResultType(DataType.TUPLE);
 
         currentPlan.add(physOp);
-              
+
         try {
             currentPlan.connect(logToPhyMap.get(pred), physOp);
         } catch (PlanException e) {
@@ -786,23 +1036,23 @@ public class LogToPhyTranslationVisitor 
 
         logToPhyMap.put(cg, physOp);
     }
-    
-    private POMergeCogroup compileToMergeCogrp(LogicalRelationalOperator relationalOp, 
+
+    private POMergeCogroup compileToMergeCogrp(LogicalRelationalOperator relationalOp,
             MultiMap<Integer, LogicalExpressionPlan> innerPlans) throws FrontendException {
-        
+
         List<Operator> inputs = relationalOp.getPlan().getPredecessors(relationalOp);
-        // LocalRearrange corresponding to each of input 
+        // LocalRearrange corresponding to each of input
         // LR is needed to extract keys out of the tuples.
-        
+
         POLocalRearrange[] innerLRs = new POLocalRearrange[inputs.size()];
         int count = 0;
         List<PhysicalOperator> inpPOs = new ArrayList<PhysicalOperator>(inputs.size());
-        
+
         for (int i=0;i<inputs.size();i++) {
             Operator op = inputs.get(i);
             PhysicalOperator physOp = logToPhyMap.get(op);
             inpPOs.add(physOp);
-            
+
             List<LogicalExpressionPlan> plans = innerPlans.get(i);
             POLocalRearrange poInnerLR = new POLocalRearrange(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
             poInnerLR.addOriginalLocation(relationalOp.getAlias(), relationalOp.getLocation());
@@ -824,16 +1074,16 @@ public class LogToPhyTranslationVisitor 
                 String msg = "Unable to set index on newly create POLocalRearrange.";
                 throw new VisitorException(msg, errCode, PigException.BUG, e1);
             }
-            poInnerLR.setKeyType(plans.size() > 1 ? DataType.TUPLE : 
+            poInnerLR.setKeyType(plans.size() > 1 ? DataType.TUPLE :
                         exprPlans.get(0).getLeaves().get(0).getResultType());
             poInnerLR.setResultType(DataType.TUPLE);
         }
-        
+
         POMergeCogroup poCogrp = new POMergeCogroup(new OperatorKey(
                 DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)),inpPOs,innerLRs,relationalOp.getRequestedParallelism());
         return poCogrp;
     }
-    
+
     private void translateMergeCogroup(LOCogroup cg) throws FrontendException {
         if(!validateMergeCogrp(cg.getInner())){
             throw new LogicalToPhysicalTranslatorException("Inner is not " +
@@ -857,33 +1107,33 @@ public class LogToPhyTranslationVisitor 
         }
         logToPhyMap.put(cg, poCogrp);
     }
-    
+
     private boolean validateMergeCogrp(boolean[] innerFlags){
-        
+
         for(boolean flag : innerFlags){
             if(flag)
                 return false;
         }
         return true;
     }
-    
+
     @Override
     public void visit(LOJoin loj) throws FrontendException {
 
         String scope = DEFAULT_SCOPE;
-        
-        // List of join predicates 
+
+        // List of join predicates
         List<Operator> inputs = loj.getPlan().getPredecessors(loj);
-        
+
         // mapping of inner join physical plans corresponding to inner physical operators.
         MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = new LinkedMultiMap<PhysicalOperator, PhysicalPlan>();
-        
+
         // Outer list corresponds to join predicates. Inner list is inner physical plan of each predicate.
         List<List<PhysicalPlan>> ppLists = new ArrayList<List<PhysicalPlan>>();
-        
+
         // List of physical operator corresponding to join predicates.
         List<PhysicalOperator> inp = new ArrayList<PhysicalOperator>();
-        
+
         // Outer list corresponds to join predicates and inner list corresponds to type of keys for each predicate.
         List<List<Byte>> keyTypes = new ArrayList<List<Byte>>();
 
@@ -891,18 +1141,18 @@ public class LogToPhyTranslationVisitor 
         String alias = loj.getAlias();
         SourceLocation location = loj.getLocation();
         int parallel = loj.getRequestedParallelism();
-        
+
         for (int i=0;i<inputs.size();i++) {
             Operator op = inputs.get(i);
             PhysicalOperator physOp = logToPhyMap.get(op);
             inp.add(physOp);
             List<LogicalExpressionPlan> plans =  (List<LogicalExpressionPlan>)loj.getJoinPlan(i);
-            
+
             List<PhysicalPlan> exprPlans = translateExpressionPlans(loj, plans);
 
             ppLists.add(exprPlans);
             joinPlans.put(physOp, exprPlans);
-            
+
             // Key could potentially be a tuple. So, we visit all exprPlans to get types of members of tuples.
             List<Byte> tupleKeyMemberTypes = new ArrayList<Byte>();
             for(PhysicalPlan exprPlan : exprPlans)
@@ -924,7 +1174,7 @@ public class LogToPhyTranslationVisitor 
                 throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
             }
             skj.setResultType(DataType.TUPLE);
-            
+
             for (int i=0; i < inputs.size(); i++) {
                 Operator op = inputs.get(i);
                 if (!innerFlags[i]) {
@@ -946,7 +1196,7 @@ public class LogToPhyTranslationVisitor 
                     skj.addSchema(null);
                 }
             }
-            
+
             currentPlan.add(skj);
 
             for (Operator op : inputs) {
@@ -986,22 +1236,22 @@ public class LogToPhyTranslationVisitor 
                 SchemaTupleFrontend.registerToGenerateIfPossible(keyToGen, false, GenContext.FR_JOIN);
                 keySchemas[i] = keyToGen;
             }
-            
+
             int fragment = 0;
             POFRJoin pfrj;
             try {
                 boolean isLeftOuter = false;
-                // We dont check for bounds issue as we assume that a join 
+                // We dont check for bounds issue as we assume that a join
                 // involves atleast two inputs
                 isLeftOuter = !innerFlags[1];
-                
+
                 Tuple nullTuple = null;
                 if( isLeftOuter ) {
                     try {
-                        // We know that in a Left outer join its only a two way 
-                        // join, so we assume index of 1 for the right input                        
-                        LogicalSchema inputSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema();                     
-                        
+                        // We know that in a Left outer join its only a two way
+                        // join, so we assume index of 1 for the right input
+                        LogicalSchema inputSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
+
                         // We check if we have a schema before the join
                         if(inputSchema == null) {
                             int errCode = 1109;
@@ -1009,21 +1259,21 @@ public class LogToPhyTranslationVisitor 
                             "on which outer join is desired should have a valid schema";
                             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT);
                         }
-                        
-                        // Using the schema we decide the number of columns/fields 
+
+                        // Using the schema we decide the number of columns/fields
                         // in the nullTuple
                         nullTuple = TupleFactory.getInstance().newTuple(inputSchema.size());
                         for(int j = 0; j < inputSchema.size(); j++) {
                             nullTuple.set(j, null);
                         }
-                        
+
                     } catch( FrontendException e ) {
                         int errCode = 2104;
                         String msg = "Error while determining the schema of input";
                         throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
                     }
                 }
-                
+
                 pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
                                         parallel,
                                         inp,
@@ -1055,9 +1305,9 @@ public class LogToPhyTranslationVisitor 
             logToPhyMap.put(loj, pfrj);
         } else if ( (loj.getJoinType() == LOJoin.JOINTYPE.MERGE || loj.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE)
                 && (new MapSideMergeValidator().validateMapSideMerge(inputs,loj.getPlan()))) {
-            
+
             PhysicalOperator smj;
-            boolean usePOMergeJoin = inputs.size() == 2 && innerFlags[0] && innerFlags[1] ; 
+            boolean usePOMergeJoin = inputs.size() == 2 && innerFlags[0] && innerFlags[1] ;
 
             if(usePOMergeJoin){
                 // We register the merge join schema information for code generation
@@ -1111,7 +1361,7 @@ public class LogToPhyTranslationVisitor 
                 // in all other cases we fall back to POMergeCogroup + Flattening FEs
                 smj = compileToMergeCogrp(loj, loj.getExpressionPlans());
             }
-            
+
             smj.setResultType(DataType.TUPLE);
             currentPlan.add(smj);
             smj.addOriginalLocation(alias, location);
@@ -1135,9 +1385,9 @@ public class LogToPhyTranslationVisitor 
                 } catch (PlanException e) {
                     throw new LogicalToPhysicalTranslatorException(e.getMessage(),e.getErrorCode(),e.getErrorSource(),e);
                 }
-                logToPhyMap.put(loj, fe);                
+                logToPhyMap.put(loj, fe);
             }
-            
+
             return;
         }
         else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
@@ -1155,8 +1405,8 @@ public class LogToPhyTranslationVisitor 
         }
         translateSoftLinks(loj);
     }
-    
-    private POPackage compileToLR_GR_PackTrio(LogicalRelationalOperator relationalOp, String customPartitioner, 
+
+    private POPackage compileToLR_GR_PackTrio(LogicalRelationalOperator relationalOp, String customPartitioner,
             boolean[] innerFlags, MultiMap<Integer, LogicalExpressionPlan> innerPlans) throws FrontendException {
 
         POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
@@ -1221,18 +1471,18 @@ public class LogToPhyTranslationVisitor 
                 throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
             }
         }
-        
+
         poPackage.setKeyType(type);
         poPackage.setResultType(DataType.TUPLE);
         poPackage.setNumInps(count);
         poPackage.setInner(innerFlags);
         return poPackage;
     }
-    
-    private POForEach compileFE4Flattening(boolean[] innerFlags,String scope, 
+
+    private POForEach compileFE4Flattening(boolean[] innerFlags,String scope,
             int parallel, String alias, SourceLocation location, List<Operator> inputs)
                 throws FrontendException {
-        
+
         List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
         List<Boolean> flattenLst = new ArrayList<Boolean>();
         POForEach fe;
@@ -1259,8 +1509,8 @@ public class LogToPhyTranslationVisitor 
                 }
                 flattenLst.add(true);
             }
-            
-            fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), 
+
+            fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
                     parallel, fePlans, flattenLst );
             fe.addOriginalLocation(alias, location);
 
@@ -1293,7 +1543,7 @@ public class LogToPhyTranslationVisitor 
             }
         }
     }
-    
+
     @Override
     public void visit(LODistinct loDistinct) throws FrontendException {
         String scope = DEFAULT_SCOPE;
@@ -1313,7 +1563,7 @@ public class LogToPhyTranslationVisitor 
             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
     @Override
     public void visit(LOLimit loLimit) throws FrontendException {
         String scope = DEFAULT_SCOPE;
@@ -1348,10 +1598,10 @@ public class LogToPhyTranslationVisitor 
             String msg = "Invalid physical operators in the physical plan" ;
             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
         }
-        
+
         translateSoftLinks(loLimit);
     }
-    
+
     @Override
     public void visit(LOSplit loSplit) throws FrontendException {
         String scope = DEFAULT_SCOPE;
@@ -1384,9 +1634,9 @@ public class LogToPhyTranslationVisitor 
 
         currentPlan.add(physOp);
 
-        List<Operator> op = loSplit.getPlan().getPredecessors(loSplit); 
+        List<Operator> op = loSplit.getPlan().getPredecessors(loSplit);
         PhysicalOperator from;
-        
+
         if(op != null) {
             from = logToPhyMap.get(op.get(0));
         } else {
@@ -1403,11 +1653,11 @@ public class LogToPhyTranslationVisitor 
             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
     @Override
     public void visit(LOSplitOutput loSplitOutput) throws FrontendException {
         String scope = DEFAULT_SCOPE;
-//        System.err.println("Entering Filter");
+        //        System.err.println("Entering Filter");
         POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
                 .getNextNodeId(scope)), loSplitOutput.getRequestedParallelism());
         poFilter.addOriginalLocation(loSplitOutput.getAlias(), loSplitOutput.getLocation());
@@ -1441,7 +1691,7 @@ public class LogToPhyTranslationVisitor 
             String msg = "Did not find a predecessor for Filter." ;
             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
         }
-        
+
         try {
             currentPlan.connect(from, poFilter);
         } catch (PlanException e) {
@@ -1449,9 +1699,9 @@ public class LogToPhyTranslationVisitor 
             String msg = "Invalid physical operators in the physical plan" ;
             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
         }
-        
+
         translateSoftLinks(loSplitOutput);
-//        System.err.println("Exiting Filter");
+        //        System.err.println("Exiting Filter");
     }
     /**
      * updates plan with check for empty bag and if bag is empty to flatten a bag
@@ -1464,7 +1714,7 @@ public class LogToPhyTranslationVisitor 
         LogicalSchema inputSchema = null;
         try {
             inputSchema = ((LogicalRelationalOperator) joinInput).getSchema();
-         
+
             if(inputSchema == null) {
                 int errCode = 1109;
                 String msg = "Input (" + ((LogicalRelationalOperator) joinInput).getAlias() + ") " +
@@ -1476,9 +1726,9 @@ public class LogToPhyTranslationVisitor 
             String msg = "Error while determining the schema of input";
             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
         }
-        
+
         CompilerUtils.addEmptyBagOuterJoin(fePlan, Util.translateSchema(inputSchema));
-        
+
     }
 
     private void translateSoftLinks(Operator op) throws FrontendException {

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java Thu Sep 13 14:55:36 2012
@@ -33,7 +33,7 @@ public abstract class LogicalRelationalN
 
     protected LogicalRelationalNodesVisitor(OperatorPlan plan, PlanWalker walker) throws FrontendException {
         super(plan, walker);
-        
+
         Iterator<Operator> iter = plan.getOperators();
         while(iter.hasNext()) {
             if (!(iter.next() instanceof LogicalRelationalOperator)) {
@@ -41,58 +41,61 @@ public abstract class LogicalRelationalN
             }
         }
     }
-    
+
     public void visit(LOLoad load) throws FrontendException {
     }
 
     public void visit(LOFilter filter) throws FrontendException {
     }
-    
+
     public void visit(LOStore store) throws FrontendException {
     }
-    
+
     public void visit(LOJoin join) throws FrontendException {
     }
-    
+
     public void visit(LOForEach foreach) throws FrontendException {
     }
-    
+
     public void visit(LOGenerate gen) throws FrontendException {
     }
-    
+
     public void visit(LOInnerLoad load) throws FrontendException {
     }
 
     public void visit(LOCube cube) throws FrontendException {
     }
-    
+
     public void visit(LOCogroup loCogroup) throws FrontendException {
     }
-    
+
     public void visit(LOSplit loSplit) throws FrontendException {
     }
-    
+
     public void visit(LOSplitOutput loSplitOutput) throws FrontendException {
     }
-    
+
     public void visit(LOUnion loUnion) throws FrontendException {
     }
-    
+
     public void visit(LOSort loSort) throws FrontendException {
     }
-    
+
+    public void visit(LORank loRank) throws FrontendException{
+    }
+
     public void visit(LODistinct loDistinct) throws FrontendException {
     }
-    
+
     public void visit(LOLimit loLimit) throws FrontendException {
     }
-    
+
     public void visit(LOCross loCross) throws FrontendException {
     }
-    
+
     public void visit(LOStream loStream) throws FrontendException {
     }
 
-    public void visit(LONative nativeMR) throws FrontendException{     
+    public void visit(LONative nativeMR) throws FrontendException{
     }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java Thu Sep 13 14:55:36 2012
@@ -43,6 +43,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplit;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -62,16 +63,16 @@ import org.apache.pig.newplan.logical.re
  */
 public class ColumnPruneHelper {
     protected static final String INPUTUIDS = "ColumnPrune:InputUids";
-    public static final String OUTPUTUIDS = "ColumnPrune:OutputUids";    
+    public static final String OUTPUTUIDS = "ColumnPrune:OutputUids";
     protected static final String REQUIREDCOLS = "ColumnPrune:RequiredColumns";
-    
+
     private OperatorPlan currentPlan;
     private OperatorSubPlan subPlan;
 
     public ColumnPruneHelper(OperatorPlan currentPlan) {
         this.currentPlan = currentPlan;
-    }    
-    
+    }
+
     private OperatorSubPlan getSubPlan() throws FrontendException {
         OperatorSubPlan p = null;
         if (currentPlan instanceof OperatorSubPlan) {
@@ -80,35 +81,35 @@ public class ColumnPruneHelper {
             p = new OperatorSubPlan(currentPlan);
         }
         Iterator<Operator> iter = currentPlan.getOperators();
-        
+
         while(iter.hasNext()) {
             Operator op = iter.next();
             if (op instanceof LOForEach) {
                 addOperator(op, p);
             }
         }
-        
+
         return p;
     }
-    
+
     private void addOperator(Operator op, OperatorSubPlan subplan) throws FrontendException {
         if (op == null) {
             return;
         }
-        
+
         subplan.add(op);
-        
+
         List<Operator> ll = currentPlan.getPredecessors(op);
         if (ll == null) {
             return;
         }
-        
+
         for(Operator pred: ll) {
             addOperator(pred, subplan);
         }
     }
-    
-        
+
+
     @SuppressWarnings("unchecked")
     public boolean check() throws FrontendException {
         List<Operator> sources = currentPlan.getSources();
@@ -117,14 +118,14 @@ public class ColumnPruneHelper {
             clearAnnotation();
             return false;
         }
-        
+
         // create sub-plan that ends with foreach
         subPlan = getSubPlan();
         if (subPlan.size() == 0) {
             clearAnnotation();
             return false;
         }
-        
+
         ColumnDependencyVisitor v = new ColumnDependencyVisitor(currentPlan);
         try {
             v.visit();
@@ -133,7 +134,7 @@ public class ColumnPruneHelper {
             clearAnnotation();
             return false;
         }
-        
+
         List<Operator> ll = subPlan.getSources();
         boolean found = false;
         for(Operator op: ll) {
@@ -141,20 +142,20 @@ public class ColumnPruneHelper {
                 Set<Long> uids = (Set<Long>)op.getAnnotation(INPUTUIDS);
                 LogicalSchema s = ((LOLoad) op).getSchema();
                 Set<Integer> required = getColumns(s, uids);
-                
+
                 if (required.size() < s.size()) {
-                    op.annotate(REQUIREDCOLS, required);              
+                    op.annotate(REQUIREDCOLS, required);
                     found = true;
                 }
             }
         }
-        
+
         if (!found)
             clearAnnotation();
-        
+
         return found;
     }
-    
+
     private void clearAnnotation() {
         Iterator<Operator> iter = currentPlan.getOperators();
         while (iter.hasNext()) {
@@ -170,7 +171,7 @@ public class ColumnPruneHelper {
         if (schema == null) {
             throw new SchemaNotDefinedException("Schema is not defined.");
         }
-        
+
         Set<Integer> cols = new HashSet<Integer>();
         Iterator<Long> iter = uids.iterator();
         while(iter.hasNext()) {
@@ -179,32 +180,32 @@ public class ColumnPruneHelper {
             if (index == -1) {
                 throw new FrontendException("UID " + uid + " is not found in the schema " + schema, 2241);
             }
-              
+
             cols.add(index);
         }
-          
+
         return cols;
     }
-    
+
     public OperatorPlan reportChanges() {
         return subPlan;
     }
-   
+
     // Visitor to calculate the input and output uids for each operator
     // It doesn't change the plan, only put calculated info as annotations
     // The input and output uids are not necessarily the top level uids of
     // a schema. They may be the uids of lower level fields of complex fields
     // that have their own schema.
-    static private class ColumnDependencyVisitor extends LogicalRelationalNodesVisitor {    	
-        
+    static private class ColumnDependencyVisitor extends LogicalRelationalNodesVisitor {
+
         public ColumnDependencyVisitor(OperatorPlan plan) throws FrontendException {
-            super(plan, new ReverseDependencyOrderWalker(plan));            
+            super(plan, new ReverseDependencyOrderWalker(plan));
         }
-        
+
         @Override
         public void visit(LOLoad load) throws FrontendException {
             Set<Long> output = setOutputUids(load);
-            
+
             // for load, input uids are same as output uids
             load.annotate(INPUTUIDS, output);
         }
@@ -212,71 +213,71 @@ public class ColumnPruneHelper {
         @Override
         public void visit(LOFilter filter) throws FrontendException {
             Set<Long> output = setOutputUids(filter);
-            
+
             // the input uids contains all the output uids and
             // projections in filter conditions
             Set<Long> input = new HashSet<Long>(output);
-            
+
             LogicalExpressionPlan exp = filter.getFilterPlan();
             collectUids(filter, exp, input);
-            
+
             filter.annotate(INPUTUIDS, input);
         }
-        
+
         @Override
         public void visit(LOStore store) throws FrontendException {
-            Set<Long> output = setOutputUids(store);            
-            
+            Set<Long> output = setOutputUids(store);
+
             if (output.isEmpty()) {
                 // to deal with load-store-load-store case
                 LogicalSchema s = store.getSchema();
                 if (s == null) {
                     throw new SchemaNotDefinedException("Schema for " + store.getName() + " is not defined.");
                 }
-                                
+
                 for(int i=0; i<s.size(); i++) {
                     output.add(s.getField(i).uid);
-                }                                                
-            }        
-            
+                }
+            }
+
             // for store, input uids are same as output uids
             store.annotate(INPUTUIDS, output);
         }
-        
+
         @Override
         public void visit(LOJoin join) throws FrontendException {
             Set<Long> output = setOutputUids(join);
-            
+
             // the input uids contains all the output uids and
             // projections in join expressions
             Set<Long> input = new HashSet<Long>(output);
-            
+
             Collection<LogicalExpressionPlan> exps = join.getExpressionPlanValues();
             Iterator<LogicalExpressionPlan> iter = exps.iterator();
             while(iter.hasNext()) {
                 LogicalExpressionPlan exp = iter.next();
                 collectUids(join, exp, input);
             }
-            
+
             join.annotate(INPUTUIDS, input);
         }
-        
+
         @Override
         public void visit(LOCogroup cg) throws FrontendException {
             Set<Long> output = setOutputUids(cg);
-            
+
             // the input uids contains all the output uids and
             // projections in join expressions
             Set<Long> input = new HashSet<Long>();
-            
+
             // Add all the uids required for doing cogroup. As in all the
             // keys on which the cogroup is done.
             for( LogicalExpressionPlan plan : cg.getExpressionPlans().values() ) {
                 collectUids(cg, plan, input);
             }
-            
+
             // Now check for the case where the output uid is a generated one
-            // If that is the case we need to add the uids which generated it in 
+            // If that is the case we need to add the uids which generated it in
             // the input
             long firstUid=-1;
             Map<Integer,Long> generatedInputUids = cg.getGeneratedInputUids();
@@ -291,34 +292,34 @@ public class ColumnPruneHelper {
                 if (pred.getSchema()!=null)
                     firstUid = pred.getSchema().getField(0).uid;
             }
-            
+
             if (input.isEmpty() && firstUid!=-1) {
                 input.add(firstUid);
             }
-            
+
             cg.annotate(INPUTUIDS, input);
         }
-        
+
         @Override
         public void visit(LOLimit limit) throws FrontendException {
             Set<Long> output = setOutputUids(limit);
-                                    
+
             // the input uids contains all the output uids and
             // projections in limit expression
             Set<Long> input = new HashSet<Long>(output);
-            
+
             LogicalExpressionPlan exp = limit.getLimitPlan();
             if (exp != null)
                 collectUids(limit, exp, input);
-            
+
             limit.annotate(INPUTUIDS, input);
         }
-        
+
         @Override
         public void visit(LOStream stream) throws FrontendException {
             // output is not used, setOutputUids is used to check if it has output schema
             Set<Long> output = setOutputUids(stream);
-            
+
             // Every field is required
             LogicalRelationalOperator pred = (LogicalRelationalOperator)plan.getPredecessors(stream).get(0);
 
@@ -326,23 +327,23 @@ public class ColumnPruneHelper {
 
             stream.annotate(INPUTUIDS, input);
         }
-        
+
         @Override
         public void visit(LODistinct distinct) throws FrontendException {
             Set<Long> input = new HashSet<Long>();
-            
+
             // Every field is required
             LogicalSchema s = distinct.getSchema();
             if (s == null) {
                 throw new SchemaNotDefinedException("Schema for " + distinct.getName() + " is not defined.");
             }
-            
+
             for(int i=0; i<s.size(); i++) {
                 input.add(s.getField(i).uid);
-            }                                                
+            }
             distinct.annotate(INPUTUIDS, input);
         }
-        
+
         @Override
         public void visit(LOCross cross) throws FrontendException {
             Set<Long> output = setOutputUids(cross);
@@ -362,7 +363,7 @@ public class ColumnPruneHelper {
             }
             cross.annotate(INPUTUIDS, output);
         }
-        
+
         @Override
         public void visit(LOUnion union) throws FrontendException {
             Set<Long> output = setOutputUids(union);
@@ -372,54 +373,67 @@ public class ColumnPruneHelper {
             }
             union.annotate(INPUTUIDS, input);
         }
-        
+
         @Override
         public void visit(LOSplit split) throws FrontendException {
             Set<Long> output = setOutputUids(split);
             split.annotate(INPUTUIDS, output);
         }
-        
+
         @Override
         public void visit(LOSplitOutput splitOutput) throws FrontendException {
             Set<Long> output = setOutputUids(splitOutput);
-            
+
             // the input uids contains all the output uids and
             // projections in splitOutput conditions
             Set<Long> input = new HashSet<Long>();
-            
+
             for (long uid : output) {
                 input.add(splitOutput.getInputUids(uid));
             }
-            
+
             LogicalExpressionPlan exp = splitOutput.getFilterPlan();
             collectUids(splitOutput, exp, input);
-            
+
             splitOutput.annotate(INPUTUIDS, input);
         }
-        
+
         @Override
         public void visit(LOSort sort) throws FrontendException {
             Set<Long> output = setOutputUids(sort);
-            
+
             Set<Long> input = new HashSet<Long>(output);
-            
+
             for (LogicalExpressionPlan exp : sort.getSortColPlans()) {
                 collectUids(sort, exp, input);
             }
-            
+
             sort.annotate(INPUTUIDS, input);
         }
-        
+
+        @Override
+        public void visit(LORank rank) throws FrontendException {
+            Set<Long> output = setOutputUids(rank);
+
+            Set<Long> input = new HashSet<Long>(output);
+
+            for (LogicalExpressionPlan exp : rank.getRankColPlans()) {
+                collectUids(rank, exp, input);
+            }
+
+            rank.annotate(INPUTUIDS, input);
+        }
+
         /*
          * This function returns all uids present in the given schema
          */
-        private Set<Long> getAllUids( LogicalSchema schema ) {            
+        private Set<Long> getAllUids( LogicalSchema schema ) {
             Set<Long> uids = new HashSet<Long>();
-            
+
             if( schema == null ) {
                 return uids;
             }
-            
+
             for( LogicalFieldSchema field : schema.getFields() ) {
                 if( ( field.type == DataType.TUPLE || field.type == DataType.BAG )
                         && field.schema != null ) {
@@ -429,19 +443,19 @@ public class ColumnPruneHelper {
             }
             return uids;
         }
-        
+
         @SuppressWarnings("unchecked")
         @Override
         public void visit(LOForEach foreach) throws FrontendException {
             Set<Long> output = setOutputUids(foreach);
-            
+
             LOGenerate gen = OptimizerUtils.findGenerate(foreach);
             gen.annotate(OUTPUTUIDS, output);
-            
+
             visit(gen);
-            
+
             Set<Long> input = (Set<Long>)gen.getAnnotation(INPUTUIDS);
-            
+
             // Make sure at least one column will retain
             if (input.isEmpty()) {
                 LogicalRelationalOperator pred = (LogicalRelationalOperator)plan.getPredecessors(foreach).get(0);
@@ -455,11 +469,11 @@ public class ColumnPruneHelper {
         @SuppressWarnings("unchecked")
         public void visit(LOGenerate gen) throws FrontendException {
              Set<Long> output = (Set<Long>)gen.getAnnotation(OUTPUTUIDS);
-             
+
              Set<Long> input = new HashSet<Long>();
-             
+
              List<LogicalExpressionPlan> ll = gen.getOutputPlans();
-             
+
              Iterator<Long> iter = output.iterator();
              while(iter.hasNext()) {
                  long uid = iter.next();
@@ -473,7 +487,7 @@ public class ColumnPruneHelper {
                              break;
                          }
                      }
-                     
+
                      if (found) {
                          List<Operator> srcs = exp.getSinks();
                          for (Operator src : srcs) {
@@ -500,7 +514,7 @@ public class ColumnPruneHelper {
                      }
                  }
              }
-              
+
              // for the flatten bag, we need to make sure at least one field is in the input
              for(int i=0; i<ll.size(); i++) {
                  if (!gen.getFlattenFlags()[i]) {
@@ -535,13 +549,13 @@ public class ColumnPruneHelper {
              }
              gen.annotate(INPUTUIDS, input);
         }
-        
+
         @Override
         public void visit(LOInnerLoad load) throws FrontendException {
             Set<Long> output = setOutputUids(load);
             load.annotate(INPUTUIDS, output);
         }
-        
+
         private void collectUids(LogicalRelationalOperator currentOp, LogicalExpressionPlan exp, Set<Long> uids) throws FrontendException {
             List<Operator> ll = exp.getSinks();
             for(Operator op: ll) {
@@ -562,20 +576,20 @@ public class ColumnPruneHelper {
                 }
             }
         }
-        
+
         @SuppressWarnings("unchecked")
         // Get output uid from output schema. If output schema does not exist,
         // throw exception
         private Set<Long> setOutputUids(LogicalRelationalOperator op) throws FrontendException {
-            
+
             List<Operator> ll = plan.getSuccessors(op);
             Set<Long> uids = new HashSet<Long>();
-            
+
             LogicalSchema s = op.getSchema();
             if (s == null) {
                 throw new SchemaNotDefinedException("Schema for " + op.getName() + " is not defined.");
             }
-                            
+
             if (ll != null) {
                 // if this is not sink, the output uids are union of input uids of its successors
                 for(Operator succ: ll) {
@@ -584,7 +598,7 @@ public class ColumnPruneHelper {
                         Iterator<Long> iter = inputUids.iterator();
                         while(iter.hasNext()) {
                             long uid = iter.next();
-                            
+
                             if (s.findField(uid) != -1) {
                                 uids.add(uid);
                             }
@@ -592,12 +606,12 @@ public class ColumnPruneHelper {
                     }
                 }
             } else {
-                // if  it's leaf, set to its schema                
+                // if  it's leaf, set to its schema
                 for(int i=0; i<s.size(); i++) {
                     uids.add(s.getField(i).uid);
-                }                                
-            } 
-            
+                }
+            }
+
             op.annotate(OUTPUTUIDS, uids);
             return uids;
         }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java?rev=1384352&r1=1384351&r2=1384352&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java Thu Sep 13 14:55:36 2012
@@ -48,6 +48,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LORank;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplit;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -70,20 +71,20 @@ public class ColumnPruneVisitor extends 
         this.columnPrune = columnPrune;
         this.requiredItems = requiredItems;
     }
-    
+
     public void addRequiredItems(LOLoad load, Pair<Map<Integer,Set<String>>,Set<Integer>> requiredItem) {
         requiredItems.put(load, requiredItem);
     }
-    
+
     @Override
     public void visit(LOLoad load) throws FrontendException {
         if(! requiredItems.containsKey( load ) ) {
             return;
         }
-        
-        Pair<Map<Integer,Set<String>>,Set<Integer>> required = 
+
+        Pair<Map<Integer,Set<String>>,Set<Integer>> required =
             requiredItems.get(load);
-        
+
         RequiredFieldList requiredFields = new RequiredFieldList();
 
         LogicalSchema s = load.getSchema();
@@ -109,15 +110,15 @@ public class ColumnPruneVisitor extends 
                 requiredField = new RequiredField();
                 requiredField.setIndex(i);
                 requiredField.setAlias(s.getField(i).alias);
-                requiredField.setType(s.getField(i).type);      
+                requiredField.setType(s.getField(i).type);
                 requiredFields.add(requiredField);
             }
         }
-        
+
         boolean[] columnRequired = new boolean[s.size()];
         for (RequiredField rf : requiredFields.getFields())
             columnRequired[rf.getIndex()] = true;
-        
+
         List<Pair<Integer, Integer>> pruneList = new ArrayList<Pair<Integer, Integer>>();
         for (int i=0;i<columnRequired.length;i++)
         {
@@ -136,7 +137,7 @@ public class ColumnPruneVisitor extends 
             }
             log.info(message);
         }
-        
+
         message = new StringBuffer();
         for(RequiredField rf: requiredFields.getFields()) {
             List<RequiredField> sub = rf.getSubFields();
@@ -146,71 +147,71 @@ public class ColumnPruneVisitor extends 
         }
         if (message.length()!=0)
             log.info(message);
-        
+
         LoadPushDown.RequiredFieldResponse response = null;
         try {
             LoadFunc loadFunc = load.getLoadFunc();
             if (loadFunc instanceof LoadPushDown) {
                 response = ((LoadPushDown)loadFunc).pushProjection(requiredFields);
             }
-                                
+
         } catch (FrontendException e) {
             log.warn("pushProjection on "+load+" throw an exception, skip it");
-        }                      
-        
-        // Loader does not support column pruning, insert foreach      
+        }
+
+        // Loader does not support column pruning, insert foreach
         if (columnPrune) {
             if (response==null || !response.getRequiredFieldResponse()) {
-                LogicalPlan p = (LogicalPlan)load.getPlan();                        
-                Operator next = p.getSuccessors(load).get(0); 
-                // if there is already a LOForEach after load, we don't need to 
+                LogicalPlan p = (LogicalPlan)load.getPlan();
+                Operator next = p.getSuccessors(load).get(0);
+                // if there is already a LOForEach after load, we don't need to
                 // add another LOForEach
                 if (next instanceof LOForEach) {
                     return;
                 }
-                
+
                 LOForEach foreach = new LOForEach(load.getPlan());
-                
-                // add foreach to the base plan                       
+
+                // add foreach to the base plan
                 p.add(foreach);
-                               
+
                 p.insertBetween(load, foreach, next);
-                
+
                 LogicalPlan innerPlan = new LogicalPlan();
                 foreach.setInnerPlan(innerPlan);
-                
+
                 // build foreach inner plan
-                List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();              
+                List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
                 LOGenerate gen = new LOGenerate(innerPlan, exps, new boolean[requiredFields.getFields().size()]);
                 innerPlan.add(gen);
-                
+
                 for (int i=0; i<requiredFields.getFields().size(); i++) {
                     LoadPushDown.RequiredField rf = requiredFields.getFields().get(i);
-                    LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, rf.getIndex());                    
-                    innerPlan.add(innerLoad);          
+                    LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, rf.getIndex());
+                    innerPlan.add(innerLoad);
                     innerPlan.connect(innerLoad, gen);
-                    
+
                     LogicalExpressionPlan exp = new LogicalExpressionPlan();
                     ProjectExpression prj = new ProjectExpression(exp, i, -1, gen);
                     exp.add(prj);
                     exps.add(exp);
-                }                
-               
+                }
+
             } else {
                 // columns are pruned, reset schema for LOLoader
                 List<Integer> requiredIndexes = new ArrayList<Integer>();
                 List<LoadPushDown.RequiredField> fieldList = requiredFields.getFields();
-                for (int i=0; i<fieldList.size(); i++) {                    
+                for (int i=0; i<fieldList.size(); i++) {
                     requiredIndexes.add(fieldList.get(i).getIndex());
                 }
 
                 load.setRequiredFields(requiredIndexes);
-                
+
                 LogicalSchema newSchema = new LogicalSchema();
-                for (int i=0; i<fieldList.size(); i++) {                    
+                for (int i=0; i<fieldList.size(); i++) {
                     newSchema.addField(s.getField(fieldList.get(i).getIndex()));
                 }
-                
+
                 load.setSchema(newSchema);
             }
         }
@@ -219,15 +220,15 @@ public class ColumnPruneVisitor extends 
     @Override
     public void visit(LOFilter filter) throws FrontendException {
     }
-    
+
     @Override
     public void visit(LOLimit limit) throws FrontendException {
     }
-    
+
     @Override
     public void visit(LOSplitOutput splitOutput) throws FrontendException {
     }
-    
+
     @SuppressWarnings("unchecked")
     @Override
     public void visit(LOSplit split) throws FrontendException {
@@ -235,15 +236,15 @@ public class ColumnPruneVisitor extends 
         for (int i=0;i<branchOutputs.size();i++) {
             Operator branchOutput = branchOutputs.get(i);
             Set<Long> branchOutputUids = (Set<Long>)branchOutput.getAnnotation(ColumnPruneHelper.INPUTUIDS);
-            
+
             if (branchOutputUids!=null) {
                 Set<Integer> columnsToDrop = new HashSet<Integer>();
-                
+
                 for (int j=0;j<split.getSchema().size();j++) {
                     if (!branchOutputUids.contains(split.getSchema().getField(j).uid))
                         columnsToDrop.add(j);
                 }
-                
+
                 if (!columnsToDrop.isEmpty()) {
                     LOForEach foreach = Util.addForEachAfter((LogicalPlan)split.getPlan(), split, i, columnsToDrop);
                     foreach.getSchema();
@@ -251,38 +252,42 @@ public class ColumnPruneVisitor extends 
             }
         }
     }
-    
+
     @Override
     public void visit(LOSort sort) throws FrontendException {
     }
-    
+
+    @Override
+    public void visit(LORank rank) throws FrontendException {
+    }
+
     @Override
     public void visit(LOStore store) throws FrontendException {
     }
-    
+
     @Override
     public void visit( LOCogroup cg ) throws FrontendException {
         addForEachIfNecessary(cg);
     }
-    
+
     @Override
     public void visit(LOJoin join) throws FrontendException {
     }
-    
+
     @Override
     public void visit(LOCross cross) throws FrontendException {
     }
-    
+
     @Override
     @SuppressWarnings("unchecked")
     public void visit(LOForEach foreach) throws FrontendException {
         if (!columnPrune) {
             return;
         }
-        
+
         // get column numbers from input uids
         Set<Long> inputUids = (Set<Long>)foreach.getAnnotation(ColumnPruneHelper.INPUTUIDS);
-        
+
         // Get all top level projects
         LogicalPlan innerPlan = foreach.getInnerPlan();
         List<LOInnerLoad> innerLoads= new ArrayList<LOInnerLoad>();
@@ -291,7 +296,7 @@ public class ColumnPruneVisitor extends 
             if (s instanceof LOInnerLoad)
                 innerLoads.add((LOInnerLoad)s);
         }
-        
+
         // If project of the innerLoad is not in INPUTUIDS, remove this innerLoad
         Set<LOInnerLoad> innerLoadsToRemove = new HashSet<LOInnerLoad>();
         for (LOInnerLoad innerLoad: innerLoads) {
@@ -308,7 +313,7 @@ public class ColumnPruneVisitor extends 
                     innerLoadsToRemove.add(innerLoad);
             }
         }
-        
+
         // Find the logical operator immediate precede LOGenerate which should be removed (the whole branch)
         Set<LogicalRelationalOperator> branchHeadToRemove = new HashSet<LogicalRelationalOperator>();
         for (LOInnerLoad innerLoad : innerLoadsToRemove) {
@@ -318,16 +323,16 @@ public class ColumnPruneVisitor extends 
             }
             branchHeadToRemove.add((LogicalRelationalOperator)op);
         }
-        
+
         // Find the expression plan to remove
         LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
         List<LogicalExpressionPlan> genPlansToRemove = new ArrayList<LogicalExpressionPlan>();
-        
+
         List<LogicalExpressionPlan> genPlans = gen.getOutputPlans();
         for (int i=0;i<genPlans.size();i++) {
             LogicalExpressionPlan expPlan = genPlans.get(i);
             List<Operator> expSources = expPlan.getSinks();
-            
+
             for (Operator expSrc : expSources) {
                 if (expSrc instanceof ProjectExpression) {
                     LogicalRelationalOperator reference = ((ProjectExpression)expSrc).findReferent();
@@ -337,7 +342,7 @@ public class ColumnPruneVisitor extends 
                 }
             }
         }
-        
+
         // Build the temporary structure based on genPlansToRemove, which include:
         // * flattenList
         // * outputPlanSchemas
@@ -352,10 +357,10 @@ public class ColumnPruneVisitor extends 
         List<LogicalSchema> outputPlanSchemas = new ArrayList<LogicalSchema>();
         List<LogicalSchema> uidOnlySchemas = new ArrayList<LogicalSchema>();
         List<LogicalSchema> userDefinedSchemas = null;
-        
+
         if (gen.getUserDefinedSchema()!=null)
             userDefinedSchemas = new ArrayList<LogicalSchema>();
-        
+
         for (int i=0;i<genPlans.size();i++) {
             LogicalExpressionPlan genPlan = genPlans.get(i);
             if (!genPlansToRemove.contains(genPlan)) {
@@ -373,17 +378,17 @@ public class ColumnPruneVisitor extends 
                 }
             }
         }
-        
+
         List<Operator> preds = innerPlan.getPredecessors(gen);
-        
+
         if (preds!=null) {  // otherwise, all gen plan are based on constant, no need to adjust
             for (int i=0;i<preds.size();i++) {
                 if (!inputsNeeded.contains(i))
                     inputsRemoved.add(i);
             }
         }
-        
-        
+
+
         // Change LOGenerate: remove unneeded output expression plan
         // change flatten flag, outputPlanSchema, uidOnlySchemas
         boolean[] flatten = new boolean[flattenList.size()];
@@ -394,11 +399,11 @@ public class ColumnPruneVisitor extends 
         gen.setOutputPlanSchemas(outputPlanSchemas);
         gen.setUidOnlySchemas(uidOnlySchemas);
         gen.setUserDefinedSchema(userDefinedSchemas);
-        
+
         for (LogicalExpressionPlan genPlanToRemove : genPlansToRemove) {
             genPlans.remove(genPlanToRemove);
         }
-        
+
         // shift project input
         if (!inputsRemoved.isEmpty()) {
             for (LogicalExpressionPlan genPlan : genPlans) {
@@ -416,7 +421,7 @@ public class ColumnPruneVisitor extends 
                 }
             }
         }
-        
+
         // Prune unneeded LOInnerLoad
         List<LogicalRelationalOperator> predToRemove = new ArrayList<LogicalRelationalOperator>();
         for (int i : inputsRemoved) {
@@ -426,18 +431,18 @@ public class ColumnPruneVisitor extends 
             removeSubTree(pred);
         }
     }
-    
+
     @Override
     public void visit(LOUnion union) throws FrontendException {
         // AddForEach before union if necessary.
         List<Operator> preds = new ArrayList<Operator>();
         preds.addAll(plan.getPredecessors(union));
-        
+
         for (Operator pred : preds) {
             addForEachIfNecessary((LogicalRelationalOperator)pred);
         }
     }
-    
+
     // remove all the operators starting from an operator
     private void removeSubTree(LogicalRelationalOperator op) throws FrontendException {
         LogicalPlan p = (LogicalPlan)op.getPlan();
@@ -447,14 +452,14 @@ public class ColumnPruneVisitor extends 
                 removeSubTree((LogicalRelationalOperator)pred);
             }
         }
-                
+
         if (p.getSuccessors(op) != null) {
-            Operator[] succs = p.getSuccessors(op).toArray(new Operator[0]);            
+            Operator[] succs = p.getSuccessors(op).toArray(new Operator[0]);
             for(Operator s: succs) {
                 p.disconnect(op, s);
             }
         }
-        
+
         p.remove(op);
     }
 
@@ -465,16 +470,16 @@ public class ColumnPruneVisitor extends 
         if (outputUids!=null) {
             LogicalSchema schema = op.getSchema();
             Set<Integer> columnsToDrop = new HashSet<Integer>();
-            
+
             for (int i=0;i<schema.size();i++) {
                 if (!outputUids.contains(schema.getField(i).uid))
                     columnsToDrop.add(i);
             }
-            
+
             if (!columnsToDrop.isEmpty()) {
                 LOForEach foreach = Util.addForEachAfter((LogicalPlan)op.getPlan(), op, 0, columnsToDrop);
                 foreach.getSchema();
             }
         }
-    }    
+    }
 }



Mime
View raw message