pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r982345 [4/13] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/ src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/log...
Date Wed, 04 Aug 2010 17:46:48 GMT
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/SubtractExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/SubtractExpression.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/SubtractExpression.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/SubtractExpression.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.expression;
+
+import java.io.IOException;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+
+/**
+ * Subtract Operator
+ */
+public class SubtractExpression extends BinaryExpression {
+
+    /**
+     * Will add this operator to the plan and connect it to the 
+     * left and right hand side operators.
+     * @param plan plan this operator is part of
+     * @param lhs expression on its left hand side
+     * @param rhs expression on its right hand side
+     */
+    public SubtractExpression(OperatorPlan plan,
+                         LogicalExpression lhs,
+                         LogicalExpression rhs) {
+        super("Subtract", plan, lhs, rhs);
+    }
+
+    /**
+     * @link org.apache.pig.experimental.plan.Operator#accept(org.apache.pig.experimental.plan.PlanVisitor)
+     */
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalExpressionVisitor)) {
+            throw new IOException("Expected LogicalExpressionVisitor");
+        }
+        ((LogicalExpressionVisitor)v).visit(this);
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof SubtractExpression) {
+            SubtractExpression ao = (SubtractExpression)other;
+            try {
+                return ao.getLhs().isEqual(getLhs()) && ao.getRhs().isEqual(getRhs());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            return false;
+        }
+    }
+    
+    @Override
+    public LogicalSchema.LogicalFieldSchema getFieldSchema() throws IOException {
+        if (fieldSchema!=null)
+            return fieldSchema;
+        fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, getLhs().getType());
+        uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
+        return fieldSchema;
+    }
+}
\ No newline at end of file

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UnaryExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UnaryExpression.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UnaryExpression.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UnaryExpression.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.expression;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+
+/**
+ * Superclass for all unary expressions
+ *
+ */
+public abstract class UnaryExpression extends LogicalExpression {
+    
+    /**
+     * Will add this operator to the plan and connect it to the 
+     * left and right hand side operators.
+     * @param name of the operator
+     * @param plan plan this operator is part of
+     * @param exp expression that this expression operators on
+     */
+    public UnaryExpression(String name,
+                            OperatorPlan plan,
+                            LogicalExpression exp) {
+        super(name, plan);
+        plan.add(this);
+        plan.connect(this, exp);        
+    }
+
+    /**
+     * Get the expression that this unary expression operators on.
+     * @return expression on the left hand side
+     * @throws IOException 
+     */
+    public LogicalExpression getExpression() throws IOException {
+        List<Operator> preds = plan.getSuccessors(this);
+        if(preds == null) {
+            return null;
+        }
+        return (LogicalExpression)preds.get(0);
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.expression;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.Util;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+
+public class UserFuncExpression extends LogicalExpression {
+
+    private FuncSpec mFuncSpec;
+    
+    public UserFuncExpression(OperatorPlan plan, FuncSpec funcSpec) {
+        super("UserFunc", plan);
+        mFuncSpec = funcSpec;
+        plan.add(this);
+    }
+
+    public FuncSpec getFuncSpec() {
+        return mFuncSpec;
+    }
+    
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalExpressionVisitor)) {
+            throw new IOException("Expected LogicalExpressionVisitor");
+        }
+        ((LogicalExpressionVisitor)v).visit(this);
+    }
+
+    @Override
+    public boolean isEqual(Operator other) {
+        if( other instanceof UserFuncExpression ) {
+            UserFuncExpression exp = (UserFuncExpression)other;
+            return plan.isEqual(exp.plan) && mFuncSpec.equals(exp.mFuncSpec );
+        } else {
+            return false;
+        }
+    }
+
+    public List<LogicalExpression> getArguments() {
+        List<Operator> successors = null;
+        List<LogicalExpression> args = new ArrayList<LogicalExpression>();
+        try {
+            successors = plan.getSuccessors(this);
+
+            if(successors == null)
+                return args;
+
+            for(Operator lo : successors){
+                args.add((LogicalExpression)lo);
+            }
+        } catch (IOException e) {
+           return args;
+        }
+        return args;
+    }
+
+    /**
+     * @param funcSpec the FuncSpec to set
+     */
+    public void setFuncSpec(FuncSpec funcSpec) {
+        mFuncSpec = funcSpec;
+    }
+    
+    @Override
+    public LogicalSchema.LogicalFieldSchema getFieldSchema() throws IOException {
+        if (fieldSchema!=null)
+            return fieldSchema;
+        LogicalSchema inputSchema = new LogicalSchema();
+        List<Operator> succs = plan.getSuccessors(this);
+
+        for(Operator lo : succs){
+            inputSchema.addField(((LogicalExpression)lo).getFieldSchema());
+        }
+
+        EvalFunc<?> ef = (EvalFunc<?>) PigContext.instantiateFuncFromSpec(mFuncSpec);
+        Schema udfSchema = ef.outputSchema(Util.translateSchema(inputSchema));
+
+        if (udfSchema != null) {
+            Schema.FieldSchema fs;
+            if(udfSchema.size() == 0) {
+                fs = new Schema.FieldSchema(null, null, DataType.findType(ef.getReturnType()));
+            } else if(udfSchema.size() == 1) {
+                fs = new Schema.FieldSchema(udfSchema.getField(0));
+            } else {
+                fs = new Schema.FieldSchema(null, udfSchema, DataType.TUPLE);
+            }
+            fieldSchema = Util.translateFieldSchema(fs);
+        } else {
+            fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, DataType.findType(ef.getReturnType()));
+        }
+        uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
+        return fieldSchema;
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.optimizer;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+
+/**
+ * A visitor that walks a logical plan and then applies a given
+ * LogicalExpressionVisitor to all expressions it encounters.
+ *
+ */
+public abstract class AllExpressionVisitor extends LogicalRelationalNodesVisitor {
+    
+    protected LogicalExpressionVisitor exprVisitor;
+    protected LogicalRelationalOperator currentOp;
+
+    /**
+     * @param plan LogicalPlan to visit
+     * @param walker Walker to use to visit the plan.
+     */
+    public AllExpressionVisitor(OperatorPlan plan,
+                                PlanWalker walker) {
+        super(plan, walker);
+    }
+    
+    /**
+     * Get a new instance of the expression visitor to apply to 
+     * a given expression.
+     * @param expr LogicalExpressionPlan that will be visited
+     * @return a new LogicalExpressionVisitor for that expression
+     */
+    abstract protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr);
+    
+    @Override
+    public void visit(LOFilter filter) throws IOException {
+        currentOp = filter;
+        LogicalExpressionVisitor v = getVisitor(filter.getFilterPlan());
+        v.visit();
+    }
+    
+    @Override
+    public void visit(LOJoin join) throws IOException {
+        currentOp = join;
+        Collection<LogicalExpressionPlan> c = join.getExpressionPlans();
+        for (LogicalExpressionPlan plan : c) {
+            LogicalExpressionVisitor v = getVisitor(plan);
+            v.visit();
+        }
+    }
+    
+    @Override
+    public void visit(LOCogroup cg) throws IOException {
+        currentOp = cg;
+        MultiMap<Integer, LogicalExpressionPlan> expressionPlans = cg.getExpressionPlans();
+        for( Integer key : expressionPlans.keySet() ) {
+            Collection<LogicalExpressionPlan> exprPlans = expressionPlans.get(key);
+            for( LogicalExpressionPlan plan : exprPlans ) {
+                LogicalExpressionVisitor v = getVisitor(plan);
+                v.visit();
+            }
+        }
+    }
+    
+    @Override
+    public void visit(LOForEach foreach) throws IOException {
+        currentOp = foreach;
+        // We have an Inner OperatorPlan in ForEach, so we go ahead
+        // and work on that plan
+        OperatorPlan innerPlan = foreach.getInnerPlan();
+        PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan);
+        pushWalker(newWalker);
+        currentWalker.walk(this);
+        popWalker();
+    }
+    
+    @Override
+    public void visit(LOGenerate gen ) throws IOException {
+        currentOp = gen;
+        Collection<LogicalExpressionPlan> plans = gen.getOutputPlans();
+        for( LogicalExpressionPlan plan : plans ) {
+            LogicalExpressionVisitor v = getVisitor(plan);
+            v.visit();
+        }
+    }
+    
+    @Override
+    public void visit(LOInnerLoad load) throws IOException {
+        // the expression in LOInnerLoad contains info relative from LOForEach
+        // so use LOForeach as currentOp
+        currentOp = load.getLOForEach();
+        LogicalExpressionPlan exp = (LogicalExpressionPlan)load.getProjection().getPlan();
+       
+        LogicalExpressionVisitor v = getVisitor(exp);
+        v.visit();       
+    }
+    
+    @Override
+    public void visit(LOSplitOutput splitOutput) throws IOException {
+        currentOp = splitOutput;
+        LogicalExpressionVisitor v = getVisitor(splitOutput.getFilterPlan());
+        v.visit();
+    }
+    
+    @Override
+    public void visit(LOSort sort) throws IOException {
+        currentOp = sort;
+        Collection<LogicalExpressionPlan> c = sort.getSortColPlans();
+        for (LogicalExpressionPlan plan : c) {
+            LogicalExpressionVisitor v = getVisitor(plan);
+            v.visit();
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.optimizer;
+
+import java.io.IOException;
+
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LODistinct;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOStream;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+
+/**
+ * A visitor that walks the logical plan and calls the same method on every
+ * type of node.  Subclasses can extend this and implement the execute
+ * method, and this method will be called on every node in the graph.
+ *
+ */
+public abstract class AllSameRalationalNodesVisitor extends LogicalRelationalNodesVisitor {
+
+    /**
+     * @param plan OperatorPlan to visit
+     * @param walker Walker to use to visit the plan
+     */
+    public AllSameRalationalNodesVisitor(OperatorPlan plan, PlanWalker walker) {
+        super(plan, walker);
+    }
+    
+    /**
+     * Method to call on every node in the logical plan.
+     * @param op Node that is currently being visited.
+     */
+    abstract protected void execute(LogicalRelationalOperator op) throws IOException;
+    
+    @Override
+    public void visit(LOFilter filter) throws IOException {
+        execute(filter);
+    }
+
+    @Override
+    public void visit(LOJoin join) throws IOException {
+        execute(join);
+    }
+
+    @Override
+    public void visit(LOCogroup cg) throws IOException {
+        execute(cg);
+    }
+
+    @Override
+    public void visit(LOLoad load) throws IOException {
+        execute(load);
+    }
+    
+    @Override
+    public void visit(LOStore store) throws IOException {
+        execute(store);
+    }
+    
+    @Override
+    public void visit(LOForEach foreach) throws IOException {
+        execute(foreach);
+    }
+    
+    @Override
+    public void visit(LOSplit split) throws IOException {
+        execute(split);
+    }
+    
+    @Override
+    public void visit(LOSplitOutput splitOutput) throws IOException {
+        execute(splitOutput);
+    }
+    
+    @Override
+    public void visit(LOUnion union) throws IOException {
+        execute(union);
+    }
+    
+    @Override
+    public void visit(LOSort sort) throws IOException {
+        execute(sort);
+    }
+    
+    @Override
+    public void visit(LODistinct distinct) throws IOException {
+        execute(distinct);
+    }
+    
+    @Override
+    public void visit(LOCross cross) throws IOException {
+        execute(cross);
+    }
+    
+    @Override
+    public void visit(LOStream stream) throws IOException {
+        execute(stream);
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameVisitor.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameVisitor.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1 @@
+package org.apache.pig.newplan.logical.optimizer;

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ExprPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ExprPrinter.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ExprPrinter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ExprPrinter.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.optimizer;
+
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.pig.newplan.DepthFirstMemoryWalker;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.AddExpression;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.BinCondExpression;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.DereferenceExpression;
+import org.apache.pig.newplan.logical.expression.DivideExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
+import org.apache.pig.newplan.logical.expression.IsNullExpression;
+import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.LessThanExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.newplan.logical.expression.MapLookupExpression;
+import org.apache.pig.newplan.logical.expression.ModExpression;
+import org.apache.pig.newplan.logical.expression.MultiplyExpression;
+import org.apache.pig.newplan.logical.expression.NegativeExpression;
+import org.apache.pig.newplan.logical.expression.NotEqualExpression;
+import org.apache.pig.newplan.logical.expression.NotExpression;
+import org.apache.pig.newplan.logical.expression.OrExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.RegexExpression;
+import org.apache.pig.newplan.logical.expression.SubtractExpression;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
+
+public class ExprPrinter extends LogicalExpressionVisitor {
+
+    protected PrintStream stream = null;
+    
+    public ExprPrinter(OperatorPlan plan, int startingLevel, PrintStream ps) {
+        super(plan, new DepthFirstMemoryWalker(plan, startingLevel));
+        stream = ps;
+    }
+    
+    public ExprPrinter(OperatorPlan plan, PrintStream ps) {
+        super(plan, new DepthFirstMemoryWalker(plan, 0));
+        stream = ps;
+    }
+    
+    private void simplevisit(LogicalExpression exp) {
+        stream.print( ((DepthFirstMemoryWalker)currentWalker).getPrefix() );
+        stream.println( exp.toString() );
+    }
+
+    @Override
+    public void visit(AndExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+
+    @Override
+    public void visit(OrExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+
+    @Override
+    public void visit(EqualExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+
+    @Override
+    public void visit(ProjectExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+    
+    @Override
+    public void visit(MapLookupExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+
+    @Override
+    public void visit(ConstantExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+
+    @Override
+    public void visit(CastExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+
+    @Override
+    public void visit(GreaterThanExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+
+    @Override
+    public void visit(GreaterThanEqualExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+
+    @Override
+    public void visit(LessThanExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+
+    @Override
+    public void visit(LessThanEqualExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+    
+    @Override
+    public void visit(NotEqualExpression exp) throws IOException { 
+        simplevisit(exp);
+    }
+
+    @Override
+    public void visit(NotExpression exp ) throws IOException {
+        simplevisit(exp);
+    }
+
+    @Override
+    public void visit(IsNullExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+    
+    @Override
+    public void visit(NegativeExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+    
+    @Override
+    public void visit(AddExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+    
+    @Override
+    public void visit(SubtractExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+    
+    @Override
+    public void visit(MultiplyExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+    
+    @Override
+    public void visit(ModExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+    
+    @Override
+    public void visit(DivideExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+    
+    @Override
+    public void visit(BinCondExpression exp ) throws IOException {
+        simplevisit(exp);
+    }
+    
+    @Override
+    public void visit(UserFuncExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+    
+    @Override
+    public void visit(DereferenceExpression exp) throws IOException {
+        simplevisit(exp);
+    }
+    
+    @Override
+    public void visit(RegexExpression op) throws IOException {
+        simplevisit(op);
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.rules.AddForEach;
+import org.apache.pig.newplan.logical.rules.ColumnMapKeyPrune;
+import org.apache.pig.newplan.logical.rules.FilterAboveForeach;
+import org.apache.pig.newplan.logical.rules.MergeFilter;
+import org.apache.pig.newplan.logical.rules.PushUpFilter;
+import org.apache.pig.newplan.logical.rules.SplitFilter;
+import org.apache.pig.newplan.logical.rules.TypeCastInserter;
+import org.apache.pig.newplan.optimizer.PlanOptimizer;
+import org.apache.pig.newplan.optimizer.Rule;
+
+public class LogicalPlanOptimizer extends PlanOptimizer {
+
+    public LogicalPlanOptimizer(OperatorPlan p, int iterations) {    	
+        super(p, null, iterations);
+        ruleSets = buildRuleSets();
+        addListeners();
+    }
+
+    protected List<Set<Rule>> buildRuleSets() {
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();	    
+
+        // TypeCastInserter
+        // This set of rules Insert Foreach dedicated for casting after load
+        Set<Rule> s = new HashSet<Rule>();
+        ls.add(s);
+        // add split filter rule
+        Rule r = new TypeCastInserter("TypeCastInserter", LOLoad.class.getName());
+        s.add(r);
+        
+        // Split Set
+        // This set of rules does splitting of operators only.
+        // It does not move operators
+        s = new HashSet<Rule>();
+        ls.add(s);
+        // add split filter rule
+        r = new SplitFilter("SplitFilter");
+        s.add(r);
+                
+         
+        
+        
+        // Push Set,
+        // This set does moving of operators only.
+        s = new HashSet<Rule>();
+        ls.add(s);
+        // add push up filter rule
+        r = new PushUpFilter("PushUpFilter");
+        s.add(r);
+        r = new FilterAboveForeach("FilterAboveForEachWithFlatten");
+        s.add(r);
+        
+        
+        
+        
+        // Merge Set
+        // This Set merges operators but does not move them.
+        s = new HashSet<Rule>();
+        ls.add(s);
+        // add merge filter rule
+        r = new MergeFilter("MergeFilter");        
+        s.add(r);	    
+        
+        
+        // Prune Set Marker
+        // This set is used for pruning columns and maps
+      
+        s = new HashSet<Rule>();
+        ls.add(s);
+        // Add the PruneMap Filter
+        r = new ColumnMapKeyPrune("ColumnMapKeyPrune");
+        s.add(r);
+        
+        // Add LOForEach operator to trim off columns
+        s = new HashSet<Rule>();
+        ls.add(s);
+        // Add the AddForEach
+        r = new AddForEach("AddForEach");
+        s.add(r);
+
+        
+        return ls;
+    }
+    
+    private void addListeners() {
+        addPlanTransformListener(new SchemaPatcher());
+        addPlanTransformListener(new ProjectionPatcher());
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.optimizer;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LODistinct;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+
+public class LogicalPlanPrinter extends LogicalRelationalNodesVisitor {
+
+    protected PrintStream stream = null;
+    protected int level = 0;
+    
+//    private String TAB1 = "    ";
+//    private String TABMore = "|   ";
+//    private String LSep = "|\n|---";
+//    private String USep = "|   |\n|   ";
+//    private int levelCntr = -1;
+    
+    public LogicalPlanPrinter(OperatorPlan plan, PrintStream ps) {
+        super(plan, new ReverseDependencyOrderWalker(plan));
+        stream = ps;
+    }
+
+    protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) {
+        return new ExprPrinter(expr, level+1, stream);
+    }
+
+    @Override
+    public void visit(LOLoad op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+    }
+
+    @Override
+    public void visit(LOStore op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+    }
+
+    @Override
+    public void visit(LOForEach op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+        level++;
+        OperatorPlan innerPlan = op.getInnerPlan();
+        PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan);
+        pushWalker(newWalker);
+        currentWalker.walk(this);
+        popWalker();
+        level--;
+    }
+
+    @Override
+    public void visit(LOFilter op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+        LogicalExpressionVisitor v = getVisitor(op.getFilterPlan());
+        level++;
+        v.visit();
+        level--;
+    }
+    
+    @Override
+    public void visit(LOJoin op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+        
+        LogicalExpressionVisitor v = null;
+        level++;
+        for (LogicalExpressionPlan plan : op.getExpressionPlans()) {
+            v = getVisitor(plan);
+            v.visit();
+        }
+        level--;
+    }
+
+    @Override
+    public void visit(LOGenerate op) throws IOException {
+        printLevel();        
+        stream.println( op.toString() );
+        List<LogicalExpressionPlan> plans = op.getOutputPlans();
+        LogicalExpressionVisitor v = null;
+        level++;
+        for( LogicalExpressionPlan plan : plans ) {
+            v = getVisitor(plan);
+            v.visit();
+        }
+        level--;
+    }
+
+    @Override
+    public void visit(LOInnerLoad op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+    }
+    
+    @Override
+    public void visit(LOCogroup op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+        MultiMap<Integer,LogicalExpressionPlan> exprPlans = op.getExpressionPlans();
+        for( Integer key : exprPlans.keySet() ) {
+            Collection<LogicalExpressionPlan> plans = exprPlans.get(key);
+            LogicalExpressionVisitor v = null;
+            level++;
+            for( LogicalExpressionPlan plan : plans ) {
+                v = getVisitor(plan);
+                v.visit();
+            }
+            level--;
+        }
+    }
+    
+    @Override
+    public void visit(LOSplitOutput op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+        LogicalExpressionVisitor v = getVisitor(op.getFilterPlan());
+        level++;
+        v.visit();
+        level--;
+    }
+    
+    @Override
+    public void visit(LOSplit op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+        level++;
+    }
+    
+    @Override
+    public void visit(LOUnion op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+        level++;
+    }
+    
+    @Override
+    public void visit(LOCross op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+        level++;
+    }
+    
+    @Override
+    public void visit(LOSort op) throws IOException {
+        printLevel();        
+        stream.println( op.toString() );
+        List<LogicalExpressionPlan> plans = op.getSortColPlans();
+        LogicalExpressionVisitor v = null;
+        level++;
+        for( LogicalExpressionPlan plan : plans ) {
+            v = getVisitor(plan);
+            v.visit();
+        }
+        level--;
+    }
+    
+    @Override
+    public void visit(LODistinct op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+    }
+    
+    @Override
+    public void visit(LOLimit op) throws IOException {
+        printLevel();
+        stream.println( op.toString() );
+    }
+
+    public String toString() {
+        return stream.toString();
+    }   
+    
+    private void printLevel() {
+        for(int i =0; i < level; i++ ) {
+            stream.print("|\t");
+        }
+        stream.println("|");
+        for(int i =0; i < level; i++ ) {
+            stream.print("|\t");
+        }
+        stream.print("|---");
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/PlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/PlanPrinter.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/PlanPrinter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/PlanPrinter.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1 @@
+package org.apache.pig.newplan.logical.optimizer;

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.optimizer;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.DepthFirstWalker;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.optimizer.PlanTransformListener;
+
+/**
+ * A PlanTransformListener that will patch up references in projections.
+ *
+ */
+public class ProjectionPatcher implements PlanTransformListener {
+
+    /**
+     * @link org.apache.pig.experimental.plan.optimizer.PlanTransformListener#transformed(org.apache.pig.experimental.plan.OperatorPlan, org.apache.pig.experimental.plan.OperatorPlan)
+     */
+    @Override
+    public void transformed(OperatorPlan fp, OperatorPlan tp)
+            throws IOException {
+        ProjectionFinder pf = new ProjectionFinder(tp);
+        pf.visit();
+    }
+    
+    private static class ProjectionRewriter extends LogicalExpressionVisitor {
+
+        ProjectionRewriter(OperatorPlan p, LogicalRelationalOperator cop) {
+            super(p, new DepthFirstWalker(p));
+        }
+        
+        @Override
+        public void visit(ProjectExpression p) throws IOException {
+            // if projection is for everything, just return
+            if (p.isProjectStar()) {
+                return;
+            }
+            
+            // Get the uid for this projection.  It must match the uid of the 
+            // value it is projecting.
+            long myUid = p.getFieldSchema().uid;
+            
+            // Find the operator this projection references
+            LogicalRelationalOperator pred = p.findReferent();
+            
+            if (p.getAttachedRelationalOp() instanceof LOGenerate && p.getPlan().getSuccessors(p)==null) {
+                // No need to adjust
+                return;
+            }
+            else {
+                // Get the schema for this operator and search it for the matching uid
+                int match = -1;
+                LogicalSchema schema = pred.getSchema();
+                if (schema==null)
+                    return;
+                List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
+                for (int i = 0; i < fields.size(); i++) {
+                    if (fields.get(i).uid == myUid) {
+                        match = i;
+                        break;
+                    }
+                }
+                if (match == -1) {
+                    throw new IOException("Couldn't find matching uid for project");
+                }
+                p.setColNum(match);
+            }
+        }        
+    }
+    
+    private static class ProjectionFinder extends AllExpressionVisitor {
+
+        public ProjectionFinder(OperatorPlan plan) {
+            super(plan, new DependencyOrderWalker(plan));
+        }
+
+        @Override
+        protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) {
+            return new ProjectionRewriter(expr, currentOp);
+        }
+        
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaPatcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaPatcher.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaPatcher.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaPatcher.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.optimizer;
+
+import java.io.IOException;
+
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.optimizer.PlanTransformListener;
+
+/**
+ * A PlanTransformListener for the logical optimizer that will patch up schemas
+ * after a plan has been transformed.
+ *
+ */
+public class SchemaPatcher implements PlanTransformListener {
+
+    /**
+     * @throws IOException 
+     * @link org.apache.pig.experimental.plan.optimizer.PlanTransformListener#transformed(org.apache.pig.experimental.plan.OperatorPlan, org.apache.pig.experimental.plan.OperatorPlan)
+     */
+    @Override
+    public void transformed(OperatorPlan fp, OperatorPlan tp) throws IOException {
+        // Walk the transformed plan and clean out the schemas and call
+        // getSchema again on each node.  This will cause each node
+        // to regenerate its schema from its parent.
+        
+        SchemaResetter schemaResetter = new SchemaResetter(tp);
+        schemaResetter.visit();
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,177 @@
+package org.apache.pig.newplan.logical.optimizer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.logical.expression.AllSameExpressionVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LODistinct;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOStream;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+
+public class SchemaResetter extends LogicalRelationalNodesVisitor {
+
+    public SchemaResetter(OperatorPlan plan) {
+        super(plan, new DependencyOrderWalker(plan));
+    }
+
+    @Override
+    public void visit(LOLoad load) throws IOException {
+        load.resetSchema();
+        load.getSchema();
+    }
+
+    @Override
+    public void visit(LOFilter filter) throws IOException {
+        filter.resetSchema();
+        FieldSchemaResetter fsResetter = new FieldSchemaResetter(filter.getFilterPlan());
+        fsResetter.visit();
+        filter.getSchema();
+    }
+    
+    @Override
+    public void visit(LOStore store) throws IOException {
+        store.resetSchema();
+        store.getSchema();
+    }
+    
+    @Override
+    public void visit(LOJoin join) throws IOException {
+        join.resetSchema();
+        Collection<LogicalExpressionPlan> joinPlans = join.getExpressionPlans();
+        for (LogicalExpressionPlan joinPlan : joinPlans) {
+            FieldSchemaResetter fsResetter = new FieldSchemaResetter(joinPlan);
+            fsResetter.visit();
+        }
+        join.getSchema();
+    }
+    
+    @Override
+    public void visit(LOForEach foreach) throws IOException {
+        foreach.resetSchema();
+        OperatorPlan innerPlan = foreach.getInnerPlan();
+        PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan);
+        pushWalker(newWalker);
+        currentWalker.walk(this);
+        popWalker();
+        foreach.getSchema();
+    }
+    
+    @Override
+    public void visit(LOGenerate gen) throws IOException {
+        gen.resetSchema();
+        List<LogicalExpressionPlan> genPlans = gen.getOutputPlans();
+        for (LogicalExpressionPlan genPlan : genPlans) {
+            FieldSchemaResetter fsResetter = new FieldSchemaResetter(genPlan);
+            fsResetter.visit();
+        }
+        gen.getSchema();
+    }
+    
+    @Override
+    public void visit(LOInnerLoad load) throws IOException {
+        load.resetSchema();
+        load.getProjection().resetFieldSchema();
+        load.getSchema();
+    }
+
+    @Override
+    public void visit(LOCogroup loCogroup) throws IOException {
+        loCogroup.resetSchema();
+        MultiMap<Integer, LogicalExpressionPlan> expPlans = loCogroup.getExpressionPlans();
+        for (LogicalExpressionPlan expPlan : expPlans.values()) {
+            FieldSchemaResetter fsResetter = new FieldSchemaResetter(expPlan);
+            fsResetter.visit();
+        }
+        loCogroup.getSchema();
+    }
+    
+    @Override
+    public void visit(LOSplit loSplit) throws IOException {
+        loSplit.resetSchema();
+        loSplit.getSchema();
+    }
+    
+    @Override
+    public void visit(LOSplitOutput loSplitOutput) throws IOException {
+        loSplitOutput.resetSchema();
+        FieldSchemaResetter fsResetter = new FieldSchemaResetter(loSplitOutput.getFilterPlan());
+        fsResetter.visit();
+        loSplitOutput.getSchema();
+    }
+    
+    @Override
+    public void visit(LOUnion loUnion) throws IOException {
+        loUnion.resetSchema();
+        loUnion.getSchema();
+    }
+    
+    @Override
+    public void visit(LOSort loSort) throws IOException {
+        loSort.resetSchema();
+        List<LogicalExpressionPlan> sortPlans = loSort.getSortColPlans();
+        for (LogicalExpressionPlan sortPlan : sortPlans) {
+            FieldSchemaResetter fsResetter = new FieldSchemaResetter(sortPlan);
+            fsResetter.visit();
+        }
+        loSort.getSchema();
+    }
+    
+    @Override
+    public void visit(LODistinct loDistinct) throws IOException {
+        loDistinct.resetSchema();
+        loDistinct.getSchema();
+    }
+    
+    @Override
+    public void visit(LOLimit loLimit) throws IOException {
+        loLimit.resetSchema();
+        loLimit.getSchema();
+    }
+    
+    @Override
+    public void visit(LOCross loCross) throws IOException {
+        loCross.resetSchema();
+        loCross.getSchema();
+    }
+    
+    @Override
+    public void visit(LOStream loStream) throws IOException {
+        loStream.resetSchema();
+        loStream.getSchema();
+    }
+}
+
+class FieldSchemaResetter extends AllSameExpressionVisitor {
+
+    protected FieldSchemaResetter(OperatorPlan p) {
+        super(p, new DependencyOrderWalker(p));
+    }
+
+    @Override
+    protected void execute(LogicalExpression op) throws IOException {
+        op.resetFieldSchema();
+        op.getFieldSchema();
+    }
+
+}
\ No newline at end of file

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidStamper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidStamper.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidStamper.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidStamper.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1 @@
+package org.apache.pig.newplan.logical.optimizer;

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+public class LOCogroup extends LogicalRelationalOperator {
+    
+    // List of booleans specifying if any of the cogroups is inner
+    private boolean[] mIsInner;
+    
+    // List of expressionPlans according to input
+    private MultiMap<Integer,LogicalExpressionPlan> mExpressionPlans;
+    
+    /**
+     * Enum for the type of group
+     */
+    public static enum GROUPTYPE {
+        REGULAR,    // Regular (co)group
+        COLLECTED   // Collected group
+    };
+    
+    private GROUPTYPE mGroupType;
+    
+    private LogicalFieldSchema groupKeyUidOnlySchema; 
+    
+    /*
+     * This is a map storing Uids which have been generated for an input
+     * This map is required to make the uids persistant between calls of
+     * resetSchema and getSchema
+     */
+    private Map<Integer,Long> generatedInputUids = new HashMap<Integer,Long>();
+    
+    final static String GROUP_COL_NAME = "group";
+        
+    public LOCogroup(OperatorPlan plan, MultiMap<Integer,LogicalExpressionPlan> 
+    expressionPlans, boolean[] isInner ) {
+        this( plan, expressionPlans, GROUPTYPE.REGULAR, isInner, -1 );
+    }
+
+    public LOCogroup(OperatorPlan plan, MultiMap<Integer,LogicalExpressionPlan> 
+    expressionPlans, GROUPTYPE groupType, boolean[] isInner, int requestedParrellism) {
+        super("LOCogroup", plan);
+        this.mExpressionPlans = expressionPlans;
+        if( isInner != null ) {
+            mIsInner = Arrays.copyOf(isInner, isInner.length);
+        }
+        this.mGroupType = groupType;
+    }
+    
+    /**
+     * Given an expression plan this function returns a LogicalFieldSchema
+     * that can be generated using this expression plan
+     * @param exprPlan ExpressionPlan which generates this field
+     * @return
+     */
+    private LogicalFieldSchema getPlanSchema( LogicalExpressionPlan exprPlan ) {
+        LogicalExpression sourceExp = (LogicalExpression) exprPlan.getSources().get(0);
+        LogicalFieldSchema planSchema = null;
+        try {
+            planSchema = sourceExp.getFieldSchema().deepCopy();
+            planSchema.uid = -1;
+        } catch (IOException e) {
+        }
+        return planSchema;
+    }
+
+    @Override
+    public LogicalSchema getSchema() {
+        // if schema is calculated before, just return
+        if (schema != null) {
+            return schema;
+        }
+
+        List<Operator> inputs = null;
+        try {
+            inputs = plan.getPredecessors(this);
+            if (inputs == null) {
+                return null;
+            }
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessors of " + name 
+                    + " operator. ", e);
+        }
+
+        List<LogicalFieldSchema> fieldSchemaList = new ArrayList<LogicalFieldSchema>();
+
+        // See if we have more than one expression plans, if so the
+        // schema of the group column will be a tuple
+        boolean hasMultipleKeys = false;
+        for( Integer key : mExpressionPlans.keySet() ) {
+            if( mExpressionPlans.get(key).size() > 1 ) {
+                hasMultipleKeys = true;
+                break;
+            }
+        }
+
+        LogicalFieldSchema groupKeySchema = null;
+        // Generate the groupField Schema
+        if( hasMultipleKeys ) {
+            LogicalSchema keySchema = new LogicalSchema();
+            // We sort here to maintain the correct order of inputs
+            for( Integer key : mExpressionPlans.keySet()) {
+                Collection<LogicalExpressionPlan> plans = 
+                    mExpressionPlans.get(key);
+
+                for( LogicalExpressionPlan plan : plans ) {
+                    LogicalFieldSchema fieldSchema = getPlanSchema(plan);
+                    // if any plan schema is null, that means we can't calculate
+                    // further schemas so we bail out
+                    if( fieldSchema == null ) {
+                        schema = null;
+                        return schema;
+                    }
+                    fieldSchema = new LogicalFieldSchema(fieldSchema);
+                    keySchema.addField(fieldSchema);
+                }
+                // We only need fields from one input and not all
+                break;
+            }
+            groupKeySchema = new LogicalFieldSchema(GROUP_COL_NAME, keySchema, DataType.TUPLE);
+        } else {
+            // We sort here to maintain the correct order of inputs
+            for( Integer key : mExpressionPlans.keySet() ) {
+                Collection<LogicalExpressionPlan> plans = mExpressionPlans.get(key);
+                for( LogicalExpressionPlan plan : plans ) {
+                    groupKeySchema = getPlanSchema(plan);
+                    // if any plan schema is null, that means we can't calculate
+                    // further schemas so we bail out
+                    if( groupKeySchema == null ) {
+                        schema = null;
+                        return schema;
+                    }
+                    groupKeySchema = new LogicalSchema.LogicalFieldSchema(groupKeySchema);
+                    // Change the uid of this field
+                    groupKeySchema.alias = GROUP_COL_NAME;
+                    break;
+                }
+                break;
+            }
+        }
+        
+        try {
+            if (groupKeySchema==null) {
+                // Something wrong
+                return null;
+            }
+            groupKeyUidOnlySchema = groupKeySchema.mergeUid(groupKeyUidOnlySchema);
+        } catch (IOException e) {
+            // TODO
+            // ADD Exception
+        }
+        fieldSchemaList.add( groupKeySchema );
+
+        // Generate the Bag Schema
+        int counter = 0;
+        for (Operator op : inputs) {
+            LogicalSchema inputSchema = ((LogicalRelationalOperator)op).getSchema();
+            // the schema of one input is unknown, so the join schema is unknown, just return 
+            if (inputSchema == null) {
+                schema = null;
+                return schema;
+            }
+           
+            // Check if we already have calculated Uid for this bag for given 
+            // input operator
+            long bagUid;
+            if (generatedInputUids.get(counter)!=null)
+                bagUid = generatedInputUids.get(counter);
+            else {
+                bagUid = LogicalExpression.getNextUid();
+                generatedInputUids.put( counter, bagUid );
+            }
+            
+            LogicalFieldSchema newBagSchema = new LogicalFieldSchema(
+                    ((LogicalRelationalOperator)op).getAlias(), inputSchema, 
+                    DataType.BAG, bagUid);
+
+            fieldSchemaList.add( newBagSchema );
+            counter ++;
+        }
+
+        schema = new LogicalSchema();
+        for(LogicalFieldSchema fieldSchema: fieldSchemaList) {
+            schema.addField(fieldSchema);
+        }         
+
+        return schema;
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+    }
+
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOCogroup) {
+            LOCogroup oc = (LOCogroup)other;
+            if( mGroupType == oc.mGroupType && 
+                    mIsInner.length == oc.mIsInner.length 
+                    && mExpressionPlans.size() == oc.mExpressionPlans.size() ) {
+                for( int i = 0; i < mIsInner.length; i++ ) {
+                    if( mIsInner[i] != oc.mIsInner[i] ) {
+                        return false;
+                    }
+                }
+                for( Integer key : mExpressionPlans.keySet() ) {                    
+                    if( ! oc.mExpressionPlans.containsKey(key) ) {
+                        return false;
+                    }
+                    Collection<LogicalExpressionPlan> exp1 = 
+                        mExpressionPlans.get(key);
+                    Collection<LogicalExpressionPlan> exp2 = 
+                        oc.mExpressionPlans.get(key);
+
+                    if(! ( exp1 instanceof ArrayList<?> 
+                    || exp2 instanceof ArrayList<?> ) ) {
+                        throw new RuntimeException( "Expected an ArrayList " +
+                        "of Expression Plans" );
+                    }
+
+                    ArrayList<LogicalExpressionPlan> expList1 = 
+                        (ArrayList<LogicalExpressionPlan>) exp1;
+                    ArrayList<LogicalExpressionPlan> expList2 = 
+                        (ArrayList<LogicalExpressionPlan>) exp2;
+
+                    for (int i = 0; i < expList1.size(); i++) {
+                        if (!expList1.get(i).isEqual(expList2.get(i))) {
+                            return false;
+                        }
+                    }
+                }
+                return checkEquality((LogicalRelationalOperator) other);
+            }
+        }
+        return false;
+    }
+
+    public GROUPTYPE getGroupType() {
+        return mGroupType;
+    }
+    
+    /**
+     * Returns an Unmodifiable Map of Input Number to Uid 
+     * @return Unmodifiable Map<Integer,Long>
+     */
+    public Map<Integer,Long> getGeneratedInputUids() {
+        return Collections.unmodifiableMap( generatedInputUids );
+    }
+    
+    public MultiMap<Integer,LogicalExpressionPlan> getExpressionPlans() {
+        return mExpressionPlans;
+    }
+    
+    public boolean[] getInner() {
+        return mIsInner;
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+
+public class LOCross extends LogicalRelationalOperator {
+    
+    private static final long serialVersionUID = 2L;
+    //private static Log log = LogFactory.getLog(LOFilter.class);
+
+        
+    public LOCross(LogicalPlan plan) {
+        super("LOCross", plan);       
+    }
+
+    @Override
+    public LogicalSchema getSchema() {        
+        // if schema is calculated before, just return
+        if (schema != null) {
+            return schema;
+        }
+        
+        List<Operator> inputs = null;
+        try {
+            inputs = plan.getPredecessors(this);
+            if (inputs == null) {
+                return null;
+            }
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessors of LOCross operator. ", e);
+        }
+        
+        List<LogicalSchema.LogicalFieldSchema> fss = new ArrayList<LogicalSchema.LogicalFieldSchema>();
+        
+        for (Operator op : inputs) {
+            LogicalSchema inputSchema = ((LogicalRelationalOperator)op).getSchema();
+            // the schema of one input is unknown, so the join schema is unknown, just return 
+            if (inputSchema == null) {
+                schema = null;
+                return schema;
+            }
+                               
+            for (int i=0; i<inputSchema.size(); i++) {
+                 LogicalSchema.LogicalFieldSchema fs = inputSchema.getField(i);
+                 LogicalSchema.LogicalFieldSchema newFS = null;
+                 if(fs.alias != null) {                    
+                     newFS = new LogicalSchema.LogicalFieldSchema(((LogicalRelationalOperator)op).getAlias()+"::"+fs.alias ,fs.schema, fs.type, fs.uid);                    
+                 } else {
+                     newFS = new LogicalSchema.LogicalFieldSchema(fs.alias, fs.schema, fs.type, fs.uid);
+                 }                       
+                 fss.add(newFS);                 
+            }            
+        }        
+
+        schema = new LogicalSchema();
+        for(LogicalSchema.LogicalFieldSchema fieldSchema: fss) {
+            schema.addField(fieldSchema);
+        }         
+        
+        return schema;
+    }   
+    
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOCross) {
+            return checkEquality((LogicalRelationalOperator)other);
+        } else {
+            return false;
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+
+public class LODistinct extends LogicalRelationalOperator {
+
+    private static final long serialVersionUID = 2L;
+    //private static Log log = LogFactory.getLog(LOFilter.class);
+
+        
+    public LODistinct(LogicalPlan plan) {
+        super("LODistinct", plan);       
+    }
+
+    @Override
+    public LogicalSchema getSchema() {      
+        LogicalRelationalOperator input = null;
+        try {
+            input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessor of LODistinct.", e);
+        }
+        
+        return input.getSchema();
+    }   
+    
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LODistinct) { 
+            return checkEquality((LogicalRelationalOperator)other);
+        } else {
+            return false;
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+
+public class LOFilter extends LogicalRelationalOperator {
+
+    private static final long serialVersionUID = 2L;
+    private LogicalExpressionPlan filterPlan;
+    //private static Log log = LogFactory.getLog(LOFilter.class);
+
+        
+    public LOFilter(LogicalPlan plan) {
+        super("LOFilter", plan);       
+    }
+
+    public LOFilter(LogicalPlan plan, LogicalExpressionPlan filterPlan) {
+        super("LOFilter", plan);
+        this.filterPlan = filterPlan;
+    }
+    
+    public LogicalExpressionPlan getFilterPlan() {
+        return filterPlan;
+    }
+    
+    public void setFilterPlan(LogicalExpressionPlan filterPlan) {
+        this.filterPlan = filterPlan;
+    }
+    
+    @Override
+    public LogicalSchema getSchema() {
+        LogicalRelationalOperator input = null;
+        try {
+            input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessor of LOFilter.", e);
+        }
+        
+        return input.getSchema();
+    }   
+    
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOFilter) { 
+            LOFilter of = (LOFilter)other;
+            return filterPlan.isEqual(of.filterPlan) && checkEquality(of);
+        } else {
+            return false;
+        }
+    }
+}
+

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+
+public class LOForEach extends LogicalRelationalOperator {
+
+    private static final long serialVersionUID = 2L;
+
+    private LogicalPlan innerPlan;
+      
+    public LOForEach(OperatorPlan plan) {
+        super("LOForEach", plan);		
+    }
+
+    public LogicalPlan getInnerPlan() {
+        return innerPlan;
+    }
+    
+    public void setInnerPlan(LogicalPlan p) {
+        innerPlan = p;
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (!(other instanceof LOForEach)) {
+            return false;
+        }
+        
+        return innerPlan.isEqual(((LOForEach)other).innerPlan);
+    }
+       
+    @Override
+    public LogicalSchema getSchema() {
+        List<Operator> ll = innerPlan.getSinks();
+        if (ll != null) {
+            schema = ((LogicalRelationalOperator)ll.get(0)).getSchema();
+        }
+        
+        return schema;
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+    }
+    
+    public static List<LOInnerLoad> findReacheableInnerLoadFromBoundaryProject(ProjectExpression project) throws IOException {
+        LogicalRelationalOperator referred = project.findReferent();
+        List<Operator> srcs = referred.getPlan().getSources();
+        List<LOInnerLoad> innerLoads = new ArrayList<LOInnerLoad>();
+        for (Operator src:srcs) {
+            if (src instanceof LOInnerLoad) {
+                Operator succ = src;
+                while (succ!=null) {
+                    if (succ==referred)
+                        innerLoads.add((LOInnerLoad)src);
+                    if (referred.getPlan().getSuccessors(succ)==null)
+                        break;
+                    succ = referred.getPlan().getSuccessors(succ).get(0);
+                }
+            }
+        }
+        return innerLoads;
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+public class LOGenerate extends LogicalRelationalOperator {
+     private List<LogicalExpressionPlan> outputPlans;
+     private boolean[] flattenFlags;
+
+    public LOGenerate(OperatorPlan plan, List<LogicalExpressionPlan> ps, boolean[] flatten) {
+        super("LOGenerate", plan);
+        outputPlans = ps;
+        flattenFlags = flatten;
+    }
+
+    @Override
+    public LogicalSchema getSchema() {
+        if (schema != null) {
+            return schema;
+        }
+        
+        schema = new LogicalSchema();
+        
+        for(int i=0; i<outputPlans.size(); i++) {
+            LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0);
+            
+            LogicalFieldSchema fieldSchema = null;
+            try {
+                fieldSchema = exp.getFieldSchema().deepCopy();
+            } catch (IOException e) {
+                return null;
+            }
+            
+            if (fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG) {
+                // if type is primitive, just add to schema
+                schema.addField(fieldSchema);
+                continue;
+            } else {
+                // if flatten is set, set schema of tuple field to this schema
+                List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>();
+                if (flattenFlags[i]) {
+                    if (fieldSchema.type == DataType.BAG) {
+                        // if it is bag of tuples, get the schema of tuples
+                        if (fieldSchema.schema.isTwoLevelAccessRequired()) {
+                            //  assert(fieldSchema.schema.size() == 1 && fieldSchema.schema.getField(0).type == DataType.TUPLE)
+                            innerFieldSchemas = fieldSchema.schema.getField(0).schema.getFields();
+                        } else {
+                            innerFieldSchemas = fieldSchema.schema.getFields();
+                        }
+                        for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
+                            fs.alias = fieldSchema.alias + "::" + fs.alias;
+                        }
+                    } else { // DataType.TUPLE
+                        innerFieldSchemas = fieldSchema.schema.getFields();
+                        for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
+                            fs.alias = fieldSchema.alias + "::" + fs.alias;
+                        }
+                    }
+                    
+                    
+                    for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas)
+                        schema.addField(fs);
+                }
+                else
+                    schema.addField(fieldSchema);
+            }
+        }
+        return schema;
+    }
+
+    public List<LogicalExpressionPlan> getOutputPlans() {
+        return outputPlans;
+    }
+    
+    public boolean[] getFlattenFlags() {
+        return flattenFlags;
+    }
+    
+    public void setFlattenFlags(boolean[] flatten) {
+        flattenFlags = flatten;
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (!(other instanceof LOGenerate)) {
+            return false;
+        }
+        
+        List<LogicalExpressionPlan> otherPlan = ((LOGenerate)other).getOutputPlans();
+        boolean[] fs = ((LOGenerate)other).getFlattenFlags();
+        
+        if (outputPlans.size() != otherPlan.size()) {
+            return false;
+        }
+        
+        for(int i=0; i<outputPlans.size(); i++) {
+            if (flattenFlags[i] != fs[i]) {
+                return false;
+            }
+            
+            if (!outputPlans.get(i).isEqual(otherPlan.get(i))) {
+                return false;
+            }
+        }
+        
+        return true;
+    }
+  
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+         if (!(v instanceof LogicalRelationalNodesVisitor)) {
+                throw new IOException("Expected LogicalPlanVisitor");
+            }
+            ((LogicalRelationalNodesVisitor)v).visit(this);
+    }
+}



Mime
View raw message