pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r982345 [6/13] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/ src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/log...
Date Wed, 04 Aug 2010 17:46:48 GMT
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,1327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.GFCross;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.InterStorage;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.CompilerUtils;
+import org.apache.pig.impl.util.LinkedMultiMap;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.SubtreeDependencyOrderWalker;
+import org.apache.pig.newplan.logical.expression.ExpToPhyTranslationVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+public class LogToPhyTranslationVisitor extends LogicalRelationalNodesVisitor {
+    
+    public LogToPhyTranslationVisitor(OperatorPlan plan) {
+        super(plan, new DependencyOrderWalker(plan));
+        currentPlan = new PhysicalPlan();
+        logToPhyMap = new HashMap<Operator, PhysicalOperator>();
+        currentPlans = new Stack<PhysicalPlan>();
+    }
+
+    protected Map<Operator, PhysicalOperator> logToPhyMap;
+
+    protected Stack<PhysicalPlan> currentPlans;
+
+    protected PhysicalPlan currentPlan;
+
+    protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
+
+    protected PigContext pc;
+    
+    public void setPigContext(PigContext pc) {
+        this.pc = pc;
+    }
+
+    public PhysicalPlan getPhysicalPlan() {
+        return currentPlan;
+    }
+    
+    @Override
+    public void visit(LOLoad loLoad) throws IOException {
+        String scope = DEFAULT_SCOPE;
+//        System.err.println("Entering Load");
+        // The last parameter here is set to true as we assume all files are 
+        // splittable due to LoadStore Refactor
+        POLoad load = new POLoad(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), loLoad.getLoadFunc());
+        load.setAlias(loLoad.getAlias());
+        load.setLFile(loLoad.getFileSpec());
+        load.setPc(pc);
+        load.setResultType(DataType.BAG);
+        load.setSignature(loLoad.getAlias());
+        currentPlan.add(load);
+        logToPhyMap.put(loLoad, load);
+
+        // Load is typically a root operator, but in the multiquery
+        // case it might have a store as a predecessor.
+        List<Operator> op = loLoad.getPlan().getPredecessors(loLoad);
+        PhysicalOperator from;
+        
+        if(op != null) {
+            from = logToPhyMap.get(op.get(0));
+            try {
+                currentPlan.connect(from, load);
+            } catch (PlanException e) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+        }
+//        System.err.println("Exiting Load");
+    }
+    
+    @Override
+    public void visit(LOFilter filter) throws IOException {
+        String scope = DEFAULT_SCOPE;
+//        System.err.println("Entering Filter");
+        POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), filter.getRequestedParallelisam());
+        poFilter.setAlias(filter.getAlias());
+        poFilter.setResultType(DataType.BAG);
+        currentPlan.add(poFilter);
+        logToPhyMap.put(filter, poFilter);
+        currentPlans.push(currentPlan);
+
+        currentPlan = new PhysicalPlan();
+
+//        PlanWalker childWalker = currentWalker
+//                .spawnChildWalker(filter.getFilterPlan());
+        PlanWalker childWalker = new ReverseDependencyOrderWalker(filter.getFilterPlan());
+        pushWalker(childWalker);
+        //currentWalker.walk(this);
+        currentWalker.walk(
+                new ExpToPhyTranslationVisitor( currentWalker.getPlan(), 
+                        childWalker, filter, currentPlan, logToPhyMap ) );
+        popWalker();
+
+        poFilter.setPlan(currentPlan);
+        currentPlan = currentPlans.pop();
+
+        List<Operator> op = filter.getPlan().getPredecessors(filter);
+
+        PhysicalOperator from;
+        if(op != null) {
+            from = logToPhyMap.get(op.get(0));
+        } else {
+            int errCode = 2051;
+            String msg = "Did not find a predecessor for Filter." ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
+        }
+        
+        try {
+            currentPlan.connect(from, poFilter);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+//        System.err.println("Exiting Filter");
+    }
+    
+    @Override
+    public void visit(LOSort sort) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        List<LogicalExpressionPlan> logPlans = sort.getSortColPlans();
+        List<PhysicalPlan> sortPlans = new ArrayList<PhysicalPlan>(logPlans.size());
+
+        // convert all the logical expression plans to physical expression plans
+        currentPlans.push(currentPlan);
+        for (LogicalExpressionPlan plan : logPlans) {
+            currentPlan = new PhysicalPlan();
+            PlanWalker childWalker = new ReverseDependencyOrderWalker(plan);
+            pushWalker(childWalker);
+            childWalker.walk(new ExpToPhyTranslationVisitor( currentWalker.getPlan(), 
+                    childWalker, sort, currentPlan, logToPhyMap ));
+            sortPlans.add(currentPlan);
+            popWalker();
+        }
+        currentPlan = currentPlans.pop();
+
+        // get the physical operator for sort
+        POSort poSort;
+        if (sort.getUserFunc() == null) {
+            poSort = new POSort(new OperatorKey(scope, nodeGen
+                    .getNextNodeId(scope)), sort.getRequestedParallelisam(), null,
+                    sortPlans, sort.getAscendingCols(), null);
+        } else {
+            POUserComparisonFunc comparator = new POUserComparisonFunc(new OperatorKey(
+                    scope, nodeGen.getNextNodeId(scope)), sort
+                    .getRequestedParallelisam(), null, sort.getUserFunc());
+            poSort = new POSort(new OperatorKey(scope, nodeGen
+                    .getNextNodeId(scope)), sort.getRequestedParallelisam(), null,
+                    sortPlans, sort.getAscendingCols(), comparator);
+        }
+        poSort.setAlias(sort.getAlias());
+        poSort.setLimit(sort.getLimit());
+        // sort.setRequestedParallelism(s.getType());
+        logToPhyMap.put(sort, poSort);
+        currentPlan.add(poSort);
+        List<Operator> op = sort.getPlan().getPredecessors(sort); 
+        PhysicalOperator from;
+        
+        if(op != null) {
+            from = logToPhyMap.get(op.get(0));
+        } else {
+            int errCode = 2051;
+            String msg = "Did not find a predecessor for Sort." ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);            
+        }
+        
+        try {
+            currentPlan.connect(from, poSort);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+
+        poSort.setResultType(DataType.BAG);
+        try {
+            poSort.setSortInfo(sort.getSortInfo());
+        } catch (FrontendException e) {
+            throw new LogicalToPhysicalTranslatorException(e);
+        }
+    }
+    
+    @Override
+    public void visit(LOCross cross) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        List<Operator> inputs = cross.getPlan().getPredecessors(cross);
+        
+        POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)), cross
+                .getRequestedParallelisam());
+        poGlobal.setAlias(cross.getAlias());
+        POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), cross.getRequestedParallelisam());
+        poGlobal.setAlias(cross.getAlias());
+        currentPlan.add(poGlobal);
+        currentPlan.add(poPackage);
+        
+        int count = 0;
+        
+        try {
+            currentPlan.connect(poGlobal, poPackage);
+            List<Boolean> flattenLst = Arrays.asList(true, true);
+            
+            for (Operator op : inputs) {
+                PhysicalPlan fep1 = new PhysicalPlan();
+                ConstantExpression ce1 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam());
+                ce1.setValue(inputs.size());
+                ce1.setResultType(DataType.INTEGER);
+                fep1.add(ce1);
+                
+                ConstantExpression ce2 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam());
+                ce2.setValue(count);
+                ce2.setResultType(DataType.INTEGER);
+                fep1.add(ce2);
+                /*Tuple ce1val = TupleFactory.getInstance().newTuple(2);
+                ce1val.set(0,inputs.size());
+                ce1val.set(1,count);
+                ce1.setValue(ce1val);
+                ce1.setResultType(DataType.TUPLE);*/
+                
+                
+
+                POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam(), Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2), new FuncSpec(GFCross.class.getName()));
+                gfc.setAlias(cross.getAlias());
+                gfc.setResultType(DataType.BAG);
+                fep1.addAsLeaf(gfc);
+                gfc.setInputs(Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2));
+                /*fep1.add(gfc);
+                fep1.connect(ce1, gfc);
+                fep1.connect(ce2, gfc);*/
+                
+                PhysicalPlan fep2 = new PhysicalPlan();
+                POProject feproj = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam());
+                feproj.setAlias(cross.getAlias());
+                feproj.setResultType(DataType.TUPLE);
+                feproj.setStar(true);
+                feproj.setOverloaded(false);
+                fep2.add(feproj);
+                List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
+                
+                POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam(), fePlans, flattenLst );
+                fe.setAlias(cross.getAlias());
+                currentPlan.add(fe);
+                currentPlan.connect(logToPhyMap.get(op), fe);
+                
+                POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+                        scope, nodeGen.getNextNodeId(scope)), cross
+                        .getRequestedParallelisam());
+                physOp.setAlias(cross.getAlias());
+                List<PhysicalPlan> lrPlans = new ArrayList<PhysicalPlan>();
+                for(int i=0;i<inputs.size();i++){
+                    PhysicalPlan lrp1 = new PhysicalPlan();
+                    POProject lrproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam(), i);
+                    lrproj1.setAlias(cross.getAlias());
+                    lrproj1.setOverloaded(false);
+                    lrproj1.setResultType(DataType.INTEGER);
+                    lrp1.add(lrproj1);
+                    lrPlans.add(lrp1);
+                }
+                
+                physOp.setCross(true);
+                physOp.setIndex(count++);
+                physOp.setKeyType(DataType.TUPLE);
+                physOp.setPlans(lrPlans);
+                physOp.setResultType(DataType.TUPLE);
+                
+                currentPlan.add(physOp);
+                currentPlan.connect(fe, physOp);
+                currentPlan.connect(physOp, poGlobal);
+            }
+        } catch (PlanException e1) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
+        } catch (ExecException e) {
+            int errCode = 2058;
+            String msg = "Unable to set index on newly create POLocalRearrange.";
+            throw new VisitorException(msg, errCode, PigException.BUG, e);
+        }
+        
+        poPackage.setKeyType(DataType.TUPLE);
+        poPackage.setResultType(DataType.TUPLE);
+        poPackage.setNumInps(count);
+        boolean inner[] = new boolean[count];
+        for (int i=0;i<count;i++) {
+            inner[i] = true;
+        }
+        poPackage.setInner(inner);
+        
+        List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
+        List<Boolean> flattenLst = new ArrayList<Boolean>();
+        for(int i=1;i<=count;i++){
+            PhysicalPlan fep1 = new PhysicalPlan();
+            POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam(), i);
+            feproj1.setAlias(cross.getAlias());
+            feproj1.setResultType(DataType.BAG);
+            feproj1.setOverloaded(false);
+            fep1.add(feproj1);
+            fePlans.add(fep1);
+            flattenLst.add(true);
+        }
+        
+        POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelisam(), fePlans, flattenLst );
+        fe.setAlias(cross.getAlias());
+        currentPlan.add(fe);
+        try{
+            currentPlan.connect(poPackage, fe);
+        }catch (PlanException e1) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
+        }
+        logToPhyMap.put(cross, fe);
+    }
+    
+    @Override
+    public void visit(LOStream stream) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        POStream poStream = new POStream(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), stream.getExecutableManager(), 
+                stream.getStreamingCommand(), this.pc.getProperties());
+        poStream.setAlias(stream.getAlias());
+        currentPlan.add(poStream);
+        logToPhyMap.put(stream, poStream);
+        
+        List<Operator> op = stream.getPlan().getPredecessors(stream);
+
+        PhysicalOperator from;
+        if(op != null) {
+            from = logToPhyMap.get(op.get(0));
+        } else {                
+            int errCode = 2051;
+            String msg = "Did not find a predecessor for Stream." ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
+        }
+        
+        try {
+            currentPlan.connect(from, poStream);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
+    public void visit(LOInnerLoad load) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        
+        POProject exprOp = new POProject(new OperatorKey(scope, nodeGen
+              .getNextNodeId(scope)));
+        
+        LogicalSchema s = load.getSchema();
+        if (s != null) {
+            if (s.getField(0).type == DataType.BAG) {
+                Operator succ = load.getPlan().getSuccessors(load).get(0);
+                if (succ instanceof LOGenerate) {
+                    exprOp.setResultType(DataType.BAG);
+                }
+                else {
+                    exprOp.setResultType(DataType.TUPLE);
+                }
+            }
+            else {
+                exprOp.setResultType(s.getField(0).type);
+            }
+        }
+        exprOp.setColumn(load.getColNum());
+        exprOp.setStar(load.getProjection().isProjectStar());        
+        
+        // set input to POProject to the predecessor of foreach
+        
+        logToPhyMap.put(load, exprOp);
+        currentPlan.add(exprOp);
+    }
+    
+    @Override
+    public void visit(LOForEach foreach) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        
+        List<PhysicalPlan> innerPlans = new ArrayList<PhysicalPlan>();
+        
+        org.apache.pig.newplan.logical.relational.LogicalPlan inner = foreach.getInnerPlan();
+        LOGenerate gen = (LOGenerate)inner.getSinks().get(0);
+       
+        List<LogicalExpressionPlan> exps = gen.getOutputPlans();
+        List<Operator> preds = inner.getPredecessors(gen);
+
+        currentPlans.push(currentPlan);
+        
+        // we need to translate each predecessor of LOGenerate into a physical plan.
+        // The physical plan should contain the expression plan for this predecessor plus
+        // the subtree starting with this predecessor
+        for (int i=0; i<exps.size(); i++) {
+            currentPlan = new PhysicalPlan();
+            // translate the expression plan
+            PlanWalker childWalker = new ReverseDependencyOrderWalker(exps.get(i));
+            pushWalker(childWalker);
+            childWalker.walk(new ExpToPhyTranslationVisitor(exps.get(i),
+                    childWalker, gen, currentPlan, logToPhyMap ));            
+            popWalker();
+            
+            List<Operator> leaves = exps.get(i).getSinks();
+            for(Operator l: leaves) {
+                PhysicalOperator op = logToPhyMap.get(l);
+                if (l instanceof ProjectExpression ) {
+                    int input = ((ProjectExpression)l).getInputNum();                    
+                    
+                    // for each sink projection, get its input logical plan and translate it
+                    Operator pred = preds.get(input);
+                    childWalker = new SubtreeDependencyOrderWalker(inner, pred);
+                    pushWalker(childWalker);
+                    childWalker.walk(this);
+                    popWalker();
+                    
+                    // get the physical operator of the leaf of input logical plan
+                    PhysicalOperator leaf = logToPhyMap.get(pred);                    
+                    
+                    if (pred instanceof LOInnerLoad) {
+                        // if predecessor is only an LOInnerLoad, remove the project that
+                        // comes from LOInnerLoad and change the column of project that
+                        // comes from expression plan
+                        currentPlan.remove(leaf);
+                        logToPhyMap.remove(pred);
+
+                        ((POProject)op).setColumn( ((POProject)leaf).getColumn() );
+                        ((POProject)op).setStar(((POProject)leaf).isStar());
+
+                    }else{                    
+                        currentPlan.connect(leaf, op);
+                    }
+                }
+            }
+            innerPlans.add(currentPlan);
+        }
+        
+        currentPlan = currentPlans.pop();
+
+        // PhysicalOperator poGen = new POGenerate(new OperatorKey("",
+        // r.nextLong()), inputs, toBeFlattened);
+        boolean[] flatten = gen.getFlattenFlags();
+        List<Boolean> flattenList = new ArrayList<Boolean>();
+        for(boolean fl: flatten) {
+            flattenList.add(fl);
+        }
+        POForEach poFE = new POForEach(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), foreach.getRequestedParallelisam(), innerPlans, flattenList);
+        poFE.setAlias(foreach.getAlias());
+        poFE.setResultType(DataType.BAG);
+        logToPhyMap.put(foreach, poFE);
+        currentPlan.add(poFE); 
+
+        // generate cannot have multiple inputs
+        List<Operator> op = foreach.getPlan().getPredecessors(foreach);
+
+        // generate may not have any predecessors
+        if (op == null)
+            return;
+
+        PhysicalOperator from = logToPhyMap.get(op.get(0));
+        try {
+           currentPlan.connect(from, poFE);
+        } catch (Exception e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+
+    }
+    
+
+    /**
+     * This function translates the new LogicalSchema into old Schema format required
+     * by PhysicalOperators
+     * @param lSchema LogicalSchema to be converted to Schema
+     * @return Schema that is converted from LogicalSchema
+     * @throws FrontendException 
+     */
+    private static Schema translateSchema( LogicalSchema lSchema ) throws FrontendException {
+        if( lSchema == null ) {
+            return null;
+        }
+        Schema schema = new Schema();
+        List<LogicalFieldSchema> lFields = lSchema.getFields();
+        for( LogicalFieldSchema lField : lFields ) {
+            FieldSchema field = new FieldSchema( lField.alias, translateSchema(lField.schema),lField.type );
+            field.canonicalName = ((Long)lField.uid).toString();
+            schema.add(field);
+        }        
+        return schema;
+    }
+    
+    /**
+     * This function takes in a List of LogicalExpressionPlan and converts them to 
+     * a list of PhysicalPlans
+     * @param plans
+     * @return
+     * @throws IOException 
+     */
+    private List<PhysicalPlan> translateExpressionPlans(LogicalRelationalOperator loj,
+            List<LogicalExpressionPlan> plans ) throws IOException {
+        List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+        if( plans == null || plans.size() == 0 ) {
+            return exprPlans;
+        }
+        
+        // Save the current plan onto stack
+        currentPlans.push(currentPlan);
+        
+        for( LogicalExpressionPlan lp : plans ) {
+            currentPlan = new PhysicalPlan();
+            
+            // We spawn a new Dependency Walker and use it 
+            // PlanWalker childWalker = currentWalker.spawnChildWalker(lp);
+            PlanWalker childWalker = new ReverseDependencyOrderWalker(lp);
+            
+            // Save the old walker and use childWalker as current Walker
+            pushWalker(childWalker);
+            
+            // We create a new ExpToPhyTranslationVisitor to walk the ExpressionPlan
+            currentWalker.walk(
+                    new ExpToPhyTranslationVisitor( 
+                            currentWalker.getPlan(), 
+                            childWalker, loj, currentPlan, logToPhyMap ) );
+            
+            exprPlans.add(currentPlan);
+            popWalker();
+        }
+        
+        // Pop the current plan back out
+        currentPlan = currentPlans.pop();
+
+        return exprPlans;
+    }
+    
+    @Override
+    public void visit(LOStore loStore) throws IOException {
+        String scope = DEFAULT_SCOPE;
+//        System.err.println("Entering Store");
+        POStore store = new POStore(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)));
+        store.setAlias(((LogicalRelationalOperator)loStore.getPlan().
+                getPredecessors(loStore).get(0)).getAlias());
+        store.setSFile(loStore.getOutputSpec());
+        // TODO Implement this
+        //store.setInputSpec(loStore.getInputSpec());
+//        try {
+            // create a new schema for ourselves so that when
+            // we serialize we are not serializing objects that
+            // contain the schema - apparently Java tries to
+            // serialize the object containing the schema if
+            // we are trying to serialize the schema reference in
+            // the containing object. The schema here will be serialized
+            // in JobControlCompiler
+            store.setSchema(translateSchema( loStore.getSchema() ));
+//        } catch (FrontendException e1) {
+//            int errorCode = 1060;
+//            String message = "Cannot resolve Store output schema";  
+//            throw new VisitorException(message, errorCode, PigException.BUG, e1);    
+//        }
+        currentPlan.add(store);
+        
+        List<Operator> op = loStore.getPlan().getPredecessors(loStore); 
+        PhysicalOperator from = null;
+        
+        if(op != null) {
+            from = logToPhyMap.get(op.get(0));
+            // TODO Implement sorting when we have a LOSort (new) and LOLimit (new) operator ready
+//            SortInfo sortInfo = null;
+//            // if store's predecessor is limit,
+//            // check limit's predecessor
+//            if(op.get(0) instanceof LOLimit) {
+//                op = loStore.getPlan().getPredecessors(op.get(0));
+//            }
+//            PhysicalOperator sortPhyOp = logToPhyMap.get(op.get(0));
+//            // if this predecessor is a sort, get
+//            // the sort info.
+//            if(op.get(0) instanceof LOSort) {
+//                sortInfo = ((POSort)sortPhyOp).getSortInfo();
+//            }
+//            store.setSortInfo(sortInfo);
+//        } else {
+//            int errCode = 2051;
+//            String msg = "Did not find a predecessor for Store." ;
+//            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);            
+        }        
+
+        try {
+            currentPlan.connect(from, store);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+        logToPhyMap.put(loStore, store);
+//        System.err.println("Exiting Store");
+    }
+    
+    @Override
+    public void visit( LOCogroup cg ) throws IOException {
+        if (cg.getGroupType() == LOCogroup.GROUPTYPE.COLLECTED) {
+            translateCollectedCogroup(cg);
+        } else {
+            translateRegularCogroup(cg);
+        }
+    }
+    
+    private void translateRegularCogroup(LOCogroup cg) throws IOException {
+        List<Operator> preds = plan.getPredecessors(cg);
+        
+        POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
+                DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), cg.getRequestedParallelisam() );
+        poGlobal.setAlias(cg.getAlias());
+        POPackage poPackage = new POPackage(new OperatorKey(DEFAULT_SCOPE, nodeGen
+                .getNextNodeId(DEFAULT_SCOPE)), cg.getRequestedParallelisam());
+        poPackage.setAlias(cg.getAlias());
+        currentPlan.add(poGlobal);
+        currentPlan.add(poPackage);
+
+        try {
+            currentPlan.connect(poGlobal, poPackage);
+        } catch (PlanException e1) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
+        }
+
+        Byte type = null;
+        for( int i = 0 ; i < preds.size(); i++ ) {
+            ArrayList<LogicalExpressionPlan> exprPlans = 
+                (ArrayList<LogicalExpressionPlan>) cg.getExpressionPlans().get(i);
+            
+            POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+                    DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), cg.getRequestedParallelisam() );
+            physOp.setAlias(cg.getAlias());
+            
+            List<PhysicalPlan> pExprPlans = translateExpressionPlans( cg, exprPlans );
+            
+            try {
+                physOp.setPlans(pExprPlans);
+            } catch (PlanException pe) {
+                int errCode = 2071;
+                String msg = "Problem with setting up local rearrange's plans.";
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
+            }
+            try {
+                physOp.setIndex(i);
+            } catch (ExecException e1) {
+                // int errCode = 2058;
+                String msg = "Unable to set index on newly create POLocalRearrange.";
+                throw new IOException(msg);
+            }
+            if (exprPlans.size() > 1) {
+                type = DataType.TUPLE;
+                physOp.setKeyType(type);
+            } else {
+                type = pExprPlans.get(0).getLeaves().get(0).getResultType();
+                physOp.setKeyType(type);
+            }
+            physOp.setResultType(DataType.TUPLE);
+
+            currentPlan.add(physOp);
+
+            try {
+                currentPlan.connect(logToPhyMap.get(preds.get(i)), physOp);
+                currentPlan.connect(physOp, poGlobal);
+            } catch (PlanException e) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+        }
+        
+        poPackage.setKeyType(type);
+        poPackage.setResultType(DataType.TUPLE);
+        poPackage.setNumInps(preds.size());
+        poPackage.setInner(cg.getInner());
+        logToPhyMap.put(cg, poPackage);
+    }
+    
+    private void translateCollectedCogroup(LOCogroup cg) throws IOException {
+        // can have only one input
+        LogicalRelationalOperator pred = (LogicalRelationalOperator) plan.getPredecessors(cg).get(0);
+        List<LogicalExpressionPlan> exprPlans = (List<LogicalExpressionPlan>) cg.getExpressionPlans().get(0);
+        POCollectedGroup physOp = new POCollectedGroup(new OperatorKey(
+                DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
+        physOp.setAlias(cg.getAlias());
+        List<PhysicalPlan> pExprPlans = translateExpressionPlans(cg, exprPlans);
+        
+        try {
+            physOp.setPlans(pExprPlans);
+        } catch (PlanException pe) {
+            int errCode = 2071;
+            String msg = "Problem with setting up map group's plans.";
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
+        }
+        Byte type = null;
+        if (exprPlans.size() > 1) {
+            type = DataType.TUPLE;
+            physOp.setKeyType(type);
+        } else {
+            type = pExprPlans.get(0).getLeaves().get(0).getResultType();
+            physOp.setKeyType(type);
+        }
+        physOp.setResultType(DataType.TUPLE);
+
+        currentPlan.add(physOp);
+              
+        try {
+            currentPlan.connect(logToPhyMap.get(pred), physOp);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+
+        logToPhyMap.put(cg, physOp);
+    }
+    
+    @Override
+    public void visit(LOJoin loj) throws IOException {
+        String scope = DEFAULT_SCOPE;
+//        System.err.println("Entering Join");
+        
+        // List of join predicates
+        List<Operator> inputs = plan.getPredecessors(loj);
+        
+        // mapping of inner join physical plans corresponding to inner physical operators.
+        MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = new LinkedMultiMap<PhysicalOperator, PhysicalPlan>();
+        
+        // Outer list corresponds to join predicates. Inner list is inner physical plan of each predicate.
+        List<List<PhysicalPlan>> ppLists = new ArrayList<List<PhysicalPlan>>();
+        
+        // List of physical operator corresponding to join predicates.
+        List<PhysicalOperator> inp = new ArrayList<PhysicalOperator>();
+        
+        // Outer list corresponds to join predicates and inner list corresponds to type of keys for each predicate.
+        List<List<Byte>> keyTypes = new ArrayList<List<Byte>>();
+        
+        for (int i=0; i<inputs.size(); i++) {
+            Operator op = inputs.get(i);
+            if( ! ( op instanceof LogicalRelationalOperator ) ) {
+                continue;
+            }
+            PhysicalOperator physOp = logToPhyMap.get(op);
+            inp.add(physOp);
+            List<LogicalExpressionPlan> plans = (List<LogicalExpressionPlan>) loj.getJoinPlan(i);
+            
+            // Convert the expression plan into physical Plan
+            List<PhysicalPlan> exprPlans = translateExpressionPlans(loj, plans);
+
+            ppLists.add(exprPlans);
+            joinPlans.put(physOp, exprPlans);
+            
+            // Key could potentially be a tuple. So, we visit all exprPlans to get types of members of tuples.
+            List<Byte> tupleKeyMemberTypes = new ArrayList<Byte>();
+            for(PhysicalPlan exprPlan : exprPlans)
+                tupleKeyMemberTypes.add(exprPlan.getLeaves().get(0).getResultType());
+            keyTypes.add(tupleKeyMemberTypes);
+        }
+
+        if (loj.getJoinType() == LOJoin.JOINTYPE.SKEWED) {
+            POSkewedJoin skj;
+            try {
+                skj = new POSkewedJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelisam(),
+                                            inp, loj.getInnerFlags());
+                skj.setAlias(loj.getAlias());
+                skj.setJoinPlans(joinPlans);
+            }
+            catch (Exception e) {
+                int errCode = 2015;
+                String msg = "Skewed Join creation failed";
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+            skj.setResultType(DataType.TUPLE);
+            
+            boolean[] innerFlags = loj.getInnerFlags();
+            for (int i=0; i < inputs.size(); i++) {
+                LogicalRelationalOperator op = (LogicalRelationalOperator) inputs.get(i);
+                if (!innerFlags[i]) {
+                    try {
+                        LogicalSchema s = op.getSchema();
+                        // if the schema cannot be determined
+                        if (s == null) {
+                            throw new FrontendException();
+                        }
+                        skj.addSchema(translateSchema(s));
+                    } catch (FrontendException e) {
+                        int errCode = 2015;
+                        String msg = "Couldn't set the schema for outer join" ;
+                        throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+                    }
+                } else {
+                    // This will never be retrieved. It just guarantees that the index will be valid when
+                    // MRCompiler is trying to read the schema
+                    skj.addSchema(null);
+                }
+            }
+            
+            currentPlan.add(skj);
+
+            for (Operator op : inputs) {
+                try {
+                    currentPlan.connect(logToPhyMap.get(op), skj);
+                } catch (PlanException e) {
+                    int errCode = 2015;
+                    String msg = "Invalid physical operators in the physical plan" ;
+                    throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+                }
+            }
+            logToPhyMap.put(loj, skj);
+        }
+        else if(loj.getJoinType() == LOJoin.JOINTYPE.REPLICATED) {
+            
+            int fragment = 0;
+            POFRJoin pfrj;
+            try {
+                boolean []innerFlags = loj.getInnerFlags();
+                boolean isLeftOuter = false;
+                // We dont check for bounds issue as we assume that a join 
+                // involves atleast two inputs
+                isLeftOuter = !innerFlags[1];
+                
+                Tuple nullTuple = null;
+                if( isLeftOuter ) {
+                    try {
+                        // We know that in a Left outer join its only a two way 
+                        // join, so we assume index of 1 for the right input                        
+                        LogicalSchema inputSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema();                     
+                        
+                        // We check if we have a schema before the join
+                        if(inputSchema == null) {
+                            int errCode = 1109;
+                            String msg = "Input (" + ((LogicalRelationalOperator) inputs.get(1)).getAlias() + ") " +
+                            "on which outer join is desired should have a valid schema";
+                            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT);
+                        }
+                        
+                        // Using the schema we decide the number of columns/fields 
+                        // in the nullTuple
+                        nullTuple = TupleFactory.getInstance().newTuple(inputSchema.size());
+                        for(int j = 0; j < inputSchema.size(); j++) {
+                            nullTuple.set(j, null);
+                        }
+                        
+                    } catch( FrontendException e ) {
+                        int errCode = 2104;
+                        String msg = "Error while determining the schema of input";
+                        throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+                    }
+                }
+                
+                pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelisam(),
+                                            inp, ppLists, keyTypes, null, fragment, isLeftOuter, nullTuple);
+                pfrj.setAlias(loj.getAlias());
+            } catch (ExecException e1) {
+                int errCode = 2058;
+                String msg = "Unable to set index on newly create POLocalRearrange.";
+                throw new VisitorException(msg, errCode, PigException.BUG, e1);
+            }
+            pfrj.setResultType(DataType.TUPLE);
+            currentPlan.add(pfrj);
+            for (Operator op : inputs) {
+                try {
+                    currentPlan.connect(logToPhyMap.get(op), pfrj);
+                } catch (PlanException e) {
+                    int errCode = 2015;
+                    String msg = "Invalid physical operators in the physical plan" ;
+                    throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+                }
+            }
+            logToPhyMap.put(loj, pfrj);
+        }
+        
+        else if (loj.getJoinType() == LOJoin.JOINTYPE.MERGE && validateMergeJoin(loj)) {
+            
+            POMergeJoin smj;
+            try {
+                smj = new POMergeJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelisam(),inp,joinPlans,keyTypes);
+            }
+            catch (Exception e) {
+                int errCode = 2042;
+                String msg = "Merge Join creation failed";
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+
+            smj.setResultType(DataType.TUPLE);
+            currentPlan.add(smj);
+
+            for (Operator op : inputs) {
+                try {
+                    currentPlan.connect(logToPhyMap.get(op), smj);
+                } catch (PlanException e) {
+                    int errCode = 2015;
+                    String msg = "Invalid physical operators in the physical plan" ;
+                    throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+                }
+            }
+            logToPhyMap.put(loj, smj);
+            return;
+        }
+        else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
+            POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
+                    scope, nodeGen.getNextNodeId(scope)), loj
+                    .getRequestedParallelisam());
+            poGlobal.setAlias(loj.getAlias());
+            POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
+                    .getNextNodeId(scope)), loj.getRequestedParallelisam());
+            poPackage.setAlias(loj.getAlias());
+            currentPlan.add(poGlobal);
+            currentPlan.add(poPackage);
+            
+            int count = 0;
+            Byte type = null;
+            
+            try {
+                currentPlan.connect(poGlobal, poPackage);
+                for (int i=0; i<inputs.size(); i++) {       
+                    Operator op = inputs.get(i);
+                    List<LogicalExpressionPlan> plans = 
+                        (List<LogicalExpressionPlan>) loj.getJoinPlan(i);
+                    POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+                            scope, nodeGen.getNextNodeId(scope)), loj
+                            .getRequestedParallelisam());
+                    List<PhysicalPlan> exprPlans = translateExpressionPlans(loj, plans);
+//                    currentPlans.push(currentPlan);
+//                    for (LogicalExpressionPlan lp : plans) {
+//                        currentPlan = new PhysicalPlan();
+//                        PlanWalker childWalker = currentWalker
+//                                .spawnChildWalker(lp);
+//                        pushWalker(childWalker);
+//                        //currentWalker.walk(this);
+//                        currentWalker.walk(
+//                                new ExpToPhyTranslationVisitor(currentWalker.getPlan(), 
+//                                        childWalker) );
+//                        exprPlans.add(currentPlan);
+//                        popWalker();
+//
+//                    }
+//                    currentPlan = currentPlans.pop();
+                    try {
+                        physOp.setPlans(exprPlans);
+                    } catch (PlanException pe) {
+                        int errCode = 2071;
+                        String msg = "Problem with setting up local rearrange's plans.";
+                        throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
+                    }
+                    try {
+                        physOp.setIndex(count++);
+                    } catch (ExecException e1) {
+                        int errCode = 2058;
+                        String msg = "Unable to set index on newly create POLocalRearrange.";
+                        throw new VisitorException(msg, errCode, PigException.BUG, e1);
+                    }
+                    if (plans.size() > 1) {
+                        type = DataType.TUPLE;
+                        physOp.setKeyType(type);
+                    } else {
+                        type = exprPlans.get(0).getLeaves().get(0).getResultType();
+                        physOp.setKeyType(type);
+                    }
+                    physOp.setResultType(DataType.TUPLE);
+
+                    currentPlan.add(physOp);
+
+                    try {
+                        currentPlan.connect(logToPhyMap.get(op), physOp);
+                        currentPlan.connect(physOp, poGlobal);
+                    } catch (PlanException e) {
+                        int errCode = 2015;
+                        String msg = "Invalid physical operators in the physical plan" ;
+                        throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+                    }
+
+                }
+                
+            } catch (PlanException e1) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
+            }
+
+            poPackage.setKeyType(type);
+            poPackage.setResultType(DataType.TUPLE);
+            poPackage.setNumInps(count);
+            
+            boolean[] innerFlags = loj.getInnerFlags();
+            poPackage.setInner(innerFlags);
+            
+            List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
+            List<Boolean> flattenLst = new ArrayList<Boolean>();
+            
+            try{
+                for(int i=0;i< count;i++){
+                    PhysicalPlan fep1 = new PhysicalPlan();
+                    POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), 
+                            loj.getRequestedParallelisam(), i+1); //i+1 since the first column is the "group" field
+                    feproj1.setAlias(loj.getAlias());
+                    feproj1.setResultType(DataType.BAG);
+                    feproj1.setOverloaded(false);
+                    fep1.add(feproj1);
+                    fePlans.add(fep1);
+                    // the parser would have marked the side
+                    // where we need to keep empty bags on
+                    // non matched as outer (innerFlags[i] would be
+                    // false)
+                    if(!(innerFlags[i])) {
+                        Operator joinInput = inputs.get(i);
+                        // for outer join add a bincond
+                        // which will project nulls when bag is
+                        // empty
+                        updateWithEmptyBagCheck(fep1, joinInput);
+                    }
+                    flattenLst.add(true);
+                }
+                
+                POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), 
+                        loj.getRequestedParallelisam(), fePlans, flattenLst );
+                fe.setAlias(loj.getAlias());
+                currentPlan.add(fe);
+                currentPlan.connect(poPackage, fe);
+                logToPhyMap.put(loj, fe);
+            }catch (PlanException e1) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
+            }
+        }
+//        System.err.println("Exiting Join");
+    }
+    
+    @Override
+    public void visit(LOUnion loUnion) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        POUnion physOp = new POUnion(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loUnion.getRequestedParallelisam());
+        physOp.setAlias(loUnion.getAlias());
+        currentPlan.add(physOp);
+        physOp.setResultType(DataType.BAG);
+        logToPhyMap.put(loUnion, physOp);
+        List<Operator> ops = loUnion.getPlan().getPredecessors(loUnion);
+
+        for (Operator l : ops) {
+            PhysicalOperator from = logToPhyMap.get(l);
+            try {
+                currentPlan.connect(from, physOp);
+            } catch (PlanException e) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+        }
+    }
+    
+    @Override
+    public void visit(LODistinct loDistinct) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        PODistinct physOp = new PODistinct(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loDistinct.getRequestedParallelisam());
+        physOp.setAlias(loDistinct.getAlias());
+        currentPlan.add(physOp);
+        physOp.setResultType(DataType.BAG);
+        logToPhyMap.put(loDistinct, physOp);
+        Operator op = loDistinct.getPlan().getPredecessors(loDistinct).get(0);
+
+        PhysicalOperator from = logToPhyMap.get(op);
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+    }
+    
+    @Override
+    public void visit(LOLimit loLimit) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        POLimit physOp = new POLimit(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loLimit.getRequestedParallelisam());
+        physOp.setLimit(loLimit.getLimit());
+        physOp.setAlias(loLimit.getAlias());
+        currentPlan.add(physOp);
+        physOp.setResultType(DataType.BAG);
+        logToPhyMap.put(loLimit, physOp);
+        Operator op = loLimit.getPlan().getPredecessors(loLimit).get(0);
+
+        PhysicalOperator from = logToPhyMap.get(op);
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+    }
+    
+    @Override
+    public void visit(LOSplit loSplit) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        POSplit physOp = new POSplit(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), loSplit.getRequestedParallelisam());
+        physOp.setAlias(loSplit.getAlias());
+        FileSpec splStrFile;
+        try {
+            splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(pc).toString(),new FuncSpec(InterStorage.class.getName()));
+        } catch (IOException e1) {
+            byte errSrc = pc.getErrorSource();
+            int errCode = 0;
+            switch(errSrc) {
+            case PigException.BUG:
+                errCode = 2016;
+                break;
+            case PigException.REMOTE_ENVIRONMENT:
+                errCode = 6002;
+                break;
+            case PigException.USER_ENVIRONMENT:
+                errCode = 4003;
+                break;
+            }
+            String msg = "Unable to obtain a temporary path." ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, errSrc, e1);
+
+        }
+        physOp.setSplitStore(splStrFile);
+        logToPhyMap.put(loSplit, physOp);
+
+        currentPlan.add(physOp);
+
+        List<Operator> op = loSplit.getPlan().getPredecessors(loSplit); 
+        PhysicalOperator from;
+        
+        if(op != null) {
+            from = logToPhyMap.get(op.get(0));
+        } else {
+            int errCode = 2051;
+            String msg = "Did not find a predecessor for Split." ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);            
+        }        
+
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+    }
+    
+    @Override
+    public void visit(LOSplitOutput loSplitOutput) throws IOException {
+        String scope = DEFAULT_SCOPE;
+//        System.err.println("Entering Filter");
+        POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), loSplitOutput.getRequestedParallelisam());
+        poFilter.setAlias(loSplitOutput.getAlias());
+        poFilter.setResultType(DataType.BAG);
+        currentPlan.add(poFilter);
+        logToPhyMap.put(loSplitOutput, poFilter);
+        currentPlans.push(currentPlan);
+
+        currentPlan = new PhysicalPlan();
+
+//        PlanWalker childWalker = currentWalker
+//                .spawnChildWalker(filter.getFilterPlan());
+        PlanWalker childWalker = new ReverseDependencyOrderWalker(loSplitOutput.getFilterPlan());
+        pushWalker(childWalker);
+        //currentWalker.walk(this);
+        currentWalker.walk(
+                new ExpToPhyTranslationVisitor( currentWalker.getPlan(), 
+                        childWalker, loSplitOutput, currentPlan, logToPhyMap ) );
+        popWalker();
+
+        poFilter.setPlan(currentPlan);
+        currentPlan = currentPlans.pop();
+
+        List<Operator> op = loSplitOutput.getPlan().getPredecessors(loSplitOutput);
+
+        PhysicalOperator from;
+        if(op != null) {
+            from = logToPhyMap.get(op.get(0));
+        } else {
+            int errCode = 2051;
+            String msg = "Did not find a predecessor for Filter." ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
+        }
+        
+        try {
+            currentPlan.connect(from, poFilter);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+//        System.err.println("Exiting Filter");
+    }
+
+    /**
+     * updates plan with check for empty bag and if bag is empty to flatten a bag
+     * with as many null's as dictated by the schema
+     * @param fePlan the plan to update
+     * @param joinInput the relation for which the corresponding bag is being checked
+     * @throws FrontendException 
+     */
+    public static void updateWithEmptyBagCheck(PhysicalPlan fePlan, Operator joinInput) throws FrontendException {
+        LogicalSchema inputSchema = null;
+        try {
+            inputSchema = ((LogicalRelationalOperator) joinInput).getSchema();
+         
+          
+            if(inputSchema == null) {
+                int errCode = 1109;
+                String msg = "Input (" + ((LogicalRelationalOperator) joinInput).getAlias() + ") " +
+                        "on which outer join is desired should have a valid schema";
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT);
+            }
+        } catch (FrontendException e) {
+            int errCode = 2104;
+            String msg = "Error while determining the schema of input";
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+        
+        CompilerUtils.addEmptyBagOuterJoin(fePlan, translateSchema(inputSchema));
+        
+    }
+
+    private boolean validateMergeJoin(LOJoin loj) throws IOException{
+        
+        List<Operator> preds = plan.getPredecessors(loj);
+
+        int errCode = 1101;
+        String errMsg = "Merge Join must have exactly two inputs.";
+        if(preds.size() != 2)
+            throw new LogicalToPhysicalTranslatorException(errMsg+" Found: "+preds.size(),errCode);
+        
+        return mergeJoinValidator(preds,loj.getPlan());
+    }
+    
+    private boolean mergeJoinValidator(List<Operator> preds,OperatorPlan lp) throws IOException {
+        
+        int errCode = 1103;
+        String errMsg = "Merge join only supports Filter, Foreach, filter and Load as its predecessor. Found : ";
+        if(preds != null && !preds.isEmpty()){
+            for(Operator lo : preds){
+                // TODO Need to add LOForEach in this statement
+                if (!(lo instanceof LOFilter || lo instanceof LOLoad)) // || lo instanceof LOForEach
+                 throw new LogicalToPhysicalTranslatorException(errMsg, errCode);
+                // All is good at this level. Visit predecessors now.
+                mergeJoinValidator(lp.getPredecessors(lo),lp);
+            }
+        }
+        // We visited everything and all is good.
+        return true;
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.pig.newplan.BaseOperatorPlan;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter;
+
+/**
+ * LogicalPlan is the logical view of relational operations Pig will execute 
+ * for a given script.  Note that it contains only relational operations.
+ * All expressions will be contained in LogicalExpressionPlans inside
+ * each relational operator.
+ */
+public class LogicalPlan extends BaseOperatorPlan {
+    
+    /**
+     * Equality is checked by calling equals on every leaf in the plan.  This
+     * assumes that plans are always connected graphs.  It is somewhat 
+     * inefficient since every leaf will test equality all the way to 
+     * every root.  But it is only intended for use in testing, so that
+     * should be ok.  Checking predecessors (as opposed to successors) was
+     * chosen because splits (which have multiple successors) do not depend
+     * on order of outputs for correctness, whereas joins (with multiple
+     * predecessors) do.  That is, reversing the outputs of split in the
+     * graph has no correctness implications, whereas reversing the inputs
+     * of join can.  This method of doing equals will detect predecessors
+     * in different orders but not successors in different orders.
+     */
+    @Override
+    public boolean isEqual(OperatorPlan other) {
+        if (other == null || !(other instanceof LogicalPlan)) {
+            return false;
+        }
+        
+        return super.isEqual(other);   
+    }
+    
+    @Override
+    public void explain(PrintStream ps, String format, boolean verbose) 
+    throws IOException {
+        ps.println("#-----------------------------------------------");
+        ps.println("# New Logical Plan:");
+        ps.println("#-----------------------------------------------");
+
+        LogicalPlanPrinter npp = new LogicalPlanPrinter(this, ps);
+        npp.visit();
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlanVisitor.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlanVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlanVisitor.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1 @@
+package org.apache.pig.newplan.logical.relational;

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.PlanWalker;
+
+/**
+ * A visitor for logical plans.
+ */
+public abstract class LogicalRelationalNodesVisitor extends PlanVisitor {
+
+    protected LogicalRelationalNodesVisitor(OperatorPlan plan, PlanWalker walker) {
+        super(plan, walker);
+        /*
+        Iterator<Operator> iter = plan.getOperators();
+        while(iter.hasNext()) {
+            if (!(iter.next() instanceof LogicalRelationalOperator)) {
+                throw new RuntimeException("LogicalPlanVisitor can only visit logical plan");
+            }
+        }*/
+    }
+    
+    public void visit(LOLoad load) throws IOException {
+    }
+
+    public void visit(LOFilter filter) throws IOException {
+    }
+    
+    public void visit(LOStore store) throws IOException {
+    }
+    
+    public void visit(LOJoin join) throws IOException {
+    }
+    
+    public void visit(LOForEach foreach) throws IOException {
+    }
+    
+    public void visit(LOGenerate gen) throws IOException {
+    }
+    
+    public void visit(LOInnerLoad load) throws IOException {
+    }
+
+    public void visit(LOCogroup loCogroup) throws IOException {
+    }
+    
+    public void visit(LOSplit loSplit) throws IOException {
+    }
+    
+    public void visit(LOSplitOutput loSplitOutput) throws IOException {
+    }
+    
+    public void visit(LOUnion loUnion) throws IOException {
+    }
+    
+    public void visit(LOSort loSort) throws IOException {
+    }
+    
+    public void visit(LODistinct loDistinct) throws IOException {
+    }
+    
+    public void visit(LOLimit loLimit) throws IOException {
+    }
+    
+    public void visit(LOCross loCross) throws IOException {
+    }
+    
+    public void visit(LOStream loStream) throws IOException {
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.relational;
+
+import java.util.Map;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+
+/**
+ * Logical representation of relational operators.  Relational operators have
+ * a schema.
+ */
+abstract public class LogicalRelationalOperator extends Operator {
+    
+    protected LogicalSchema schema;
+    protected int requestedParallelism;
+    protected String alias;
+    protected int lineNum;
+
+    /**
+     * 
+     * @param name of this operator
+     * @param plan this operator is in
+     */
+    public LogicalRelationalOperator(String name, OperatorPlan plan) {
+        this(name, plan, -1);
+    }
+    
+    /**
+     * 
+     * @param name of this operator
+     * @param plan this operator is in
+     * @param rp requested parallelism
+     */
+    public LogicalRelationalOperator(String name,
+                                     OperatorPlan plan,
+                                     int rp) {
+        super(name, plan);
+        requestedParallelism = rp;
+    }
+
+    /**
+     * Get the schema for the output of this relational operator.  This does
+     * not merely return the schema variable.  If schema is not yet set, this
+     * will attempt to construct it.  Therefore it is abstract since each
+     * operator will need to construct its schema differently.
+     * @return the schema
+     */
+    abstract public LogicalSchema getSchema();
+    
+    public void setSchema(LogicalSchema schema) {
+        this.schema = schema;
+    }
+    
+    /**
+     * Reset the schema to null so that the next time getSchema is called
+     * the schema will be regenerated from scratch.
+     */
+    public void resetSchema() {
+        schema = null;
+    }
+ 
+
+    /**
+     * Get the requestedParallelism for this operator.
+     * @return requestedParallelsim
+     */
+    public int getRequestedParallelisam() {
+        return requestedParallelism;
+    } 
+
+    /**
+     * Get the alias of this operator.  That is, if the Pig Latin for this operator
+     * was 'X = sort W by $0' then the alias will be X.  For store and split it will
+     * be the alias being stored or split.  Note that because of this this alias
+     * is not guaranteed to be unique to a single operator.
+     * @return alias
+     */
+
+    public String getAlias() {
+        return alias;
+    }
+    
+    public void setAlias(String alias) {
+        this.alias = alias;
+    }
+    
+    public void setRequestedParallelism(int parallel) {
+        this.requestedParallelism = parallel;
+    }
+
+    /**
+     * Get the line number in the submitted Pig Latin script where this operator
+     * occurred.
+     * @return line number
+     */
+    public int getLineNumber() {
+        return lineNum;
+    }
+    
+    /**
+     * Only to be used by unit tests.  This is a back door cheat to set the schema
+     * without having to calculate it.  This should never be called by production
+     * code, only by tests.
+     * @param schema to set
+     */
+    public void neverUseForRealSetSchema(LogicalSchema schema) {
+        this.schema = schema;
+    }
+    
+    /**
+     * Do some basic equality checks on two relational operators.  Equality
+     * is defined here as having equal schemas and  predecessors that are equal.
+     * This is intended to be used by operators' equals methods.
+     * @param other LogicalRelationalOperator to compare predecessors against
+     * @return true if the isEquals() methods of this node's predecessor(s) returns
+     * true when invoked with other's predecessor(s).
+     */
+    protected boolean checkEquality(LogicalRelationalOperator other) {
+        if (other == null) return false;
+        LogicalSchema s = getSchema();
+        LogicalSchema os = other.getSchema();
+        if (s == null && os == null) {
+            // intentionally blank
+        } else if (s == null || os == null) {
+            // one of them is null and one isn't
+            return false;
+        } else {
+            if (!s.isEqual(os)) return false;
+        }
+        return true;
+    }
+    
+    public String toString() {
+        StringBuilder msg = new StringBuilder();
+
+        msg.append("(Name: " + name + " Schema: ");
+        if (schema!=null)
+            msg.append(schema);
+        else
+            msg.append("null");
+        msg.append(")");
+        if (annotations!=null) {
+            for (Map.Entry<String, Object> entry : annotations.entrySet()) {
+                msg.append(entry);
+            }
+        }
+        return msg.toString();
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+
+/**
+ * Schema, from a logical perspective.
+ */
+public class LogicalSchema {
+
+    public static class LogicalFieldSchema {
+        public String alias;
+        public byte type;
+        public long uid;
+        public LogicalSchema schema;
+
+        public LogicalFieldSchema(String alias, LogicalSchema schema,  byte type) {
+            this(alias, schema, type, -1);
+        }
+        
+        public LogicalFieldSchema(LogicalFieldSchema fs) {
+            this(fs.alias, fs.schema, fs.type, fs.uid);
+        }
+        
+        public LogicalFieldSchema(String alias, LogicalSchema schema,  byte type, long uid) {
+            this.alias = alias;
+            this.type = type;
+            this.schema = schema;
+            this.uid = uid;
+        }
+        
+        /**
+         * Equality is defined as having the same type and either the same schema
+         * or both null schema.  Alias and uid are not checked.
+         */
+        public boolean isEqual(Object other) {
+            if (other instanceof LogicalFieldSchema) {
+                LogicalFieldSchema ofs = (LogicalFieldSchema)other;
+                if (type != ofs.type) return false;
+                if (schema == null && ofs.schema == null) return true;
+                if (schema == null) return false;
+                else return schema.isEqual(ofs.schema);
+            } else {
+                return false;
+            }
+        }
+               
+        public String toString() {
+            if( type == DataType.BAG ) {
+                if( schema == null ) {
+                    return ( alias + "#" + uid + ":bag{}#" );
+                }
+                return ( alias + "#" + uid + ":bag{" + schema.toString() + "}" );
+            } else if( type == DataType.TUPLE ) {
+                if( schema == null ) {
+                    return ( alias + "#" + uid + ":tuple{}" );
+                }
+                return ( alias + "#" + uid + ":tuple(" + schema.toString() + ")" );
+            }
+            return ( alias + "#" + uid + ":" + DataType.findTypeName(type) );
+        }
+        
+        public void stampFieldSchema() {
+            if (uid==-1)
+                uid = LogicalExpression.getNextUid();
+            if (schema!=null) {
+                for (LogicalFieldSchema fs : schema.getFields()) {
+                    fs.stampFieldSchema();
+                }
+            }
+        }
+        
+        private boolean compatible(LogicalFieldSchema uidOnlyFieldSchema) {
+            if (uidOnlyFieldSchema==null)
+                return false;
+            if (this.schema==null && uidOnlyFieldSchema.schema!=null ||
+                    this.schema!=null && uidOnlyFieldSchema==null)
+                return false;
+            if (this.schema!=null) {
+                if (this.schema.size()!=uidOnlyFieldSchema.schema.size())
+                    return false;
+                for (int i=0;i<this.schema.size();i++) {
+                    boolean comp = schema.getField(i).compatible(uidOnlyFieldSchema.schema.getField(i));
+                    if (!comp) return false;
+                }
+            }
+            return true;
+        }
+        public LogicalSchema.LogicalFieldSchema mergeUid(LogicalFieldSchema uidOnlyFieldSchema) throws IOException {
+            if (uidOnlyFieldSchema!=null && compatible(uidOnlyFieldSchema)) {
+                this.uid = uidOnlyFieldSchema.uid;
+                if (this.schema!=null) {
+                    for (int i=0;i<this.schema.size();i++) {
+                        schema.getField(i).mergeUid(uidOnlyFieldSchema.schema.getField(i));
+                    }
+                }
+                return uidOnlyFieldSchema;
+            }
+            else {
+                if (uidOnlyFieldSchema==null) {
+                    stampFieldSchema();
+                }
+                else {
+                    this.uid = uidOnlyFieldSchema.uid;
+                    if (this.schema!=null) {
+                        for (int i=0;i<this.schema.size();i++) {
+                            schema.getField(i).stampFieldSchema();
+                        }
+                    }
+                }
+                LogicalFieldSchema clonedUidOnlyCopy = cloneUid();
+                return clonedUidOnlyCopy;
+            }
+        }
+        
+        public LogicalFieldSchema cloneUid() {
+            LogicalFieldSchema resultFs = null;
+            if (schema==null) {
+                resultFs = new LogicalFieldSchema(null, null, (byte)-1, uid);
+            }
+            else {
+                LogicalSchema schema = new LogicalSchema();
+                resultFs = new LogicalFieldSchema(null, schema, (byte)-1, uid);
+                for (int i=0;i<schema.size();i++) {
+                    LogicalFieldSchema fs = schema.getField(i).cloneUid();
+                    schema.addField(fs);
+                }
+            }
+            return resultFs;
+        }
+        
+        public LogicalFieldSchema deepCopy() {
+            LogicalFieldSchema newFs = new LogicalFieldSchema(alias!=null?alias:null, schema!=null?schema.deepCopy():null, 
+                    type, uid);
+            return newFs;
+        }
+    }
+    
+    private List<LogicalFieldSchema> fields;
+    private Map<String, Pair<Integer, Boolean>> aliases;
+    
+    private boolean twoLevelAccessRequired = false;
+    
+    public LogicalSchema() {
+        fields = new ArrayList<LogicalFieldSchema>();
+        aliases = new HashMap<String, Pair<Integer, Boolean>>();
+    }
+    
+    /**
+     * Add a field to this schema.
+     * @param field to be added to the schema
+     */
+    public void addField(LogicalFieldSchema field) {
+        fields.add(field);
+        if (field.alias != null && !field.alias.equals("")) {
+            // put the full name of this field into aliases map
+            // boolean in the pair indicates if this alias is full name
+            aliases.put(field.alias, new Pair<Integer, Boolean>(fields.size()-1, true));
+            int index = 0;
+            
+            // check and put short names into alias map if there is no conflict
+            
+            while(index != -1) {
+                index = field.alias.indexOf("::", index);
+                if (index != -1) {
+                    String a = field.alias.substring(index+2);
+                    if (aliases.containsKey(a)) {
+                        // remove conflict if the conflict is not full name
+                        // we can never remove full name
+                        if (!aliases.get(a).second) {
+                            aliases.remove(a);
+                        }
+                    }else{
+                        // put alias into map and indicate it is a short name
+                        aliases.put(a, new Pair<Integer, Boolean>(fields.size()-1, false));                       
+                    }
+
+                    index = index +2;
+                }
+            }
+        }
+    }
+    
+    /**
+     * Fetch a field by alias
+     * @param alias
+     * @return field associated with alias, or null if no such field
+     */
+    public LogicalFieldSchema getField(String alias) {
+        Pair<Integer, Boolean> p = aliases.get(alias);
+        if (p == null) {
+            return null;
+        }
+
+        return fields.get(p.first);
+    }
+
+    /**
+     * Fetch a field by field number
+     * @param fieldNum field number to fetch
+     * @return field
+     */
+    public LogicalFieldSchema getField(int fieldNum) {
+        return fields.get(fieldNum);
+    }
+    
+    /**
+     * Get all fields
+     * @return list of all fields
+     */
+    public List<LogicalFieldSchema> getFields() {
+        return fields;
+    }
+
+    /**
+     * Get the size of the schema.
+     * @return size
+     */
+    public int size() {
+       return fields.size();
+    }
+    
+    /**
+     * Two schemas are equal if they are of equal size and their fields
+     * schemas considered in order are equal.
+     */
+    public boolean isEqual(Object other) {
+        if (other != null && other instanceof LogicalSchema) {
+            LogicalSchema os = (LogicalSchema)other;
+            if (size() != os.size()) return false;
+            for (int i = 0; i < size(); i++) {
+                if (!getField(i).isEqual(os.getField(i))) return false;
+            }
+            return true;
+        } else {
+            return false;
+        }
+        
+    }
+    
+    /**
+     * Look for the index of the field that contains the specified uid
+     * @param uid the uid to look for
+     * @return the index of the field, -1 if not found
+     */
+    public int findField(long uid) {            
+        
+        for(int i=0; i< size(); i++) {
+            LogicalFieldSchema f = getField(i);
+            // if this field has the same uid, then return this field
+            if (f.uid == uid) {
+                return i;
+            } 
+            
+            // if this field has a schema, check its schema
+            if (f.schema != null) {
+                if (f.schema.findField(uid) != -1) {
+                    return i;
+                }
+            }
+        }
+        
+        return -1;
+    }
+    
+    
+    /**
+     * Merge two schemas.
+     * @param s1
+     * @param s2
+     * @return a merged schema, or null if the merge fails
+     */
+    public static LogicalSchema merge(LogicalSchema s1, LogicalSchema s2) {
+        // TODO
+        return null;
+    }
+    
+    public String toString() {
+        StringBuilder str = new StringBuilder();
+        
+        for( LogicalFieldSchema field : fields ) {
+            str.append( field.toString() + "," );
+        }
+        if( fields.size() != 0 ) {
+            str.deleteCharAt( str.length() -1 );
+        }
+        return str.toString();
+    }
+    
+    public void setTwoLevelAccessRequired(boolean flag) {
+        twoLevelAccessRequired = flag;
+    }
+    
+    public boolean isTwoLevelAccessRequired() {
+        return twoLevelAccessRequired;
+    }
+
+    public LogicalSchema mergeUid(LogicalSchema uidOnlySchema) throws IOException {
+        if (uidOnlySchema!=null) {
+            if (size()!=uidOnlySchema.size()) {
+                throw new IOException("structure of schema change");
+                }
+            for (int i=0;i<size();i++) {
+                getField(i).mergeUid(uidOnlySchema.getField(i));
+                }
+            return uidOnlySchema;
+        }
+        else {
+            LogicalSchema clonedUidOnlyCopy = new LogicalSchema();
+            for (int i=0;i<size();i++) {
+                getField(i).stampFieldSchema();
+                clonedUidOnlyCopy.addField(getField(i).cloneUid());                    
+                }
+            return clonedUidOnlyCopy;
+        }
+    }
+    
+    public LogicalSchema deepCopy()  {
+        LogicalSchema newSchema = new LogicalSchema();
+        newSchema.setTwoLevelAccessRequired(isTwoLevelAccessRequired());
+        for (int i=0;i<size();i++)
+            newSchema.addField(getField(i).deepCopy());
+        return newSchema;
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/SchemaNotDefinedException.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/SchemaNotDefinedException.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/SchemaNotDefinedException.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/SchemaNotDefinedException.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.relational;
+
+
+public class SchemaNotDefinedException extends RuntimeException {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public SchemaNotDefinedException(Throwable e) {
+        super(e);
+    }
+    
+    public SchemaNotDefinedException(String msg, Throwable e) {
+        super(msg, e);
+    }
+    
+    public SchemaNotDefinedException(String msg) {
+        super(msg);
+    }	
+}



Mime
View raw message