pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r898497 [1/2] - in /hadoop/pig/trunk: src/org/apache/pig/experimental/logical/ src/org/apache/pig/experimental/logical/expression/ src/org/apache/pig/experimental/logical/relational/ src/org/apache/pig/experimental/plan/ src/org/apache/pig/...
Date Tue, 12 Jan 2010 20:32:31 GMT
Author: gates
Date: Tue Jan 12 20:32:29 2010
New Revision: 898497

URL: http://svn.apache.org/viewvc?rev=898497&view=rev
Log:
PIG-1178  Beginnings of a prototype of a new logical optimizer framework.  I did not add anything to CHANGES because this is only a beginning and because it is locked off in an experimental package
and thus will not be used by the rest of the code.

Added:
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpression.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpressionPlan.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DepthFirstWalker.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanEdge.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanWalker.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Rule.java
    hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Transformer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalOperatorPlan.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalRule.java

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpression.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpression.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpression.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.expression;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+
+/**
+ * Logical representation of expression operators.  Expression operators have
+ * a data type and a uid.  Uid is a unique id for each expression. 
+ *
+ */
+public abstract class LogicalExpression extends Operator {
+    
+    protected DataType type;
+    protected long uid;
+
+    public LogicalExpression(String name, OperatorPlan plan) {
+        super(name, plan);
+    }
+    
+    public DataType getType() {
+        return type;
+    }
+    
+    public long getUid() {
+        return uid;
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpressionPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpressionPlan.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpressionPlan.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/expression/LogicalExpressionPlan.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.expression;
+
+import org.apache.pig.experimental.plan.BaseOperatorPlan;
+
+/**
+ * A plan containing LogicalExpressionOperators.
+ */
+public class LogicalExpressionPlan extends BaseOperatorPlan {
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class LOLoad extends LogicalRelationalOperator {
+    
+    private LogicalSchema scriptSchema;
+
+    /**
+     * 
+     * @param loader FuncSpec for load function to use for this load.
+     * @param schema schema user specified in script, or null if not
+     * specified.
+     * @param plan logical plan this load is part of.
+     */
+    public LOLoad(FuncSpec loader, LogicalSchema schema, LogicalPlan plan) {
+       super("LOLoad", plan);
+       scriptSchema = schema;
+    }
+    
+    /**
+     * Get the schema for this load.  The schema will be either be what was
+     * given by the user in the script or what the load functions getSchema
+     * call returned.  Otherwise null will be returned, indicating that the
+     * schema is unknown.
+     * @return schema, or null if unknown
+     */
+    @Override
+    public LogicalSchema getSchema() {
+        if (schema != null) return schema;
+        
+        // TODO get schema from LoaderMetadata interface.
+        LogicalSchema fromMetadata = getSchemaFromMetaData();
+        
+        if (scriptSchema != null && fromMetadata != null) {
+            schema = LogicalSchema.merge(scriptSchema, fromMetadata);
+            return schema;
+        }
+        
+        if (scriptSchema != null)  schema = scriptSchema;
+        else if (fromMetadata != null) schema = fromMetadata;
+        return schema;
+    }
+
+    private LogicalSchema getSchemaFromMetaData() {
+        return null;
+    }
+
+    @Override
+    public void accept(PlanVisitor v) {
+        if (!(v instanceof LogicalPlanVisitor)) {
+            throw new RuntimeException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalPlanVisitor)v).visitLOLoad(this);
+
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.experimental.plan.BaseOperatorPlan;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+
+/**
+ * LogicalPlan is the logical view of relational operations Pig will execute 
+ * for a given script.  Note that it contains only realtional operations.
+ * All expressions will be contained in LogicalExpressionPlans inside
+ * each relational operator.  LogicalPlan provides operations for
+ * removing and adding LogicalRelationalOperators.  These will handle doing
+ * all of the necessary add, remove, connect, and disconnect calls in
+ * OperatorPlan.  They will not handle patching up individual relational
+ * operators.  That will be handle by the various Patchers.
+ *
+ */
+public class LogicalPlan extends BaseOperatorPlan {
+    
+    /**
+     * Add a relational operation to the plan.
+     * @param before operator that will be before the new operator.  This
+     * operator should already be in the plan.  If before is null then
+     * the new operator will be a root.
+     * @param newOper new operator to add.  This operator should not already
+     * be in the plan.
+     * @param after operator  that will be after the new operator.  This
+     * operator should already be in the plan.  If after is null, then the
+     * new operator will be a root.
+     * @throws IOException if add is already in the plan, or before or after
+     * are not in the plan.
+     */
+    public void add(LogicalRelationalOperator before,
+                    LogicalRelationalOperator newOper,
+                    LogicalRelationalOperator after) throws IOException {
+        doAdd(before, newOper, after);
+    }
+   
+    /**
+     * Add a relational operation with multiple outputs to the plan.
+     * @param before operators that will be before the new operator.  These
+     * operator should already be in the plan.
+     * @param newOper new operator to add.  This operator should not already
+     * be in the plan.
+     * @param after operator  that will be after the new operator.  This
+     * operator should already be in the plan.  If after is null, then the
+     * new operator will be a root.
+     * @throws IOException if add is already in the plan, or before or after
+     * are not in the plan.
+     */
+    public void add(List<LogicalRelationalOperator> before,
+                    LogicalRelationalOperator newOper,
+                    LogicalRelationalOperator after) throws IOException {
+        doAdd(null, newOper, after);
+        
+        for (LogicalRelationalOperator op : before) {
+            checkIn(op);
+            connect(op, newOper);
+        }
+    }
+    
+    /**
+     * Add a relational operation with multiple inputs to the plan.
+     * @param before operator that will be before the new operator.  This
+     * operator should already be in the plan.  If before is null then
+     * the new operator will be a root.
+     * @param newOper new operator to add.  This operator should not already
+     * be in the plan.
+     * @param after operators that will be after the new operator.  These
+     * operator should already be in the plan.
+     * @throws IOException if add is already in the plan, or before or after
+     * are not in the plan.
+     */
+    public void add(LogicalRelationalOperator before,
+                    LogicalRelationalOperator newOper,
+                    List<LogicalRelationalOperator> after) throws IOException {
+        doAdd(before, newOper, null);
+        
+        for (LogicalRelationalOperator op : after) {
+            checkIn(op);
+            connect(newOper, op);
+        }
+    }
+    
+    /**
+     * Add a relational operation to the plan when the caller wants to control
+     * how the nodes are connected in the graph.
+     * @param before operator that will be before the new operator.  This
+     * operator should already be in the plan.  before should not be null.
+     * the new operator will be a root.
+     * @param beforeToPos Position in before's edges to connect newOper at.
+     * @param beforeFromPos Position in newOps's edges to connect before at.
+     * @param newOper new operator to add.  This operator should not already
+     * be in the plan.
+     * @param afterToPos Position in after's edges to connect newOper at.
+     * @param afterFromPos Position in newOps's edges to connect after at.
+     * @param after operator  that will be after the new operator.  This
+     * operator should already be in the plan.  If after is null, then the
+     * new operator will be a root.
+     * @throws IOException if add is already in the plan, or before or after
+     * are not in the plan.
+     */
+    public void add(LogicalRelationalOperator before,
+                    int beforeToPos,
+                    int beforeFromPos,
+                    LogicalRelationalOperator newOper,
+                    int afterToPos,
+                    int afterFromPos,
+                    LogicalRelationalOperator after) throws IOException {
+        if (before != null) checkIn(before);
+        if (after != null) checkIn(after);
+        checkNotIn(newOper);
+        
+        add(newOper);
+        if (before != null) connect(before, beforeToPos, newOper, beforeFromPos);
+        if (after != null) connect(newOper, afterToPos, after, afterFromPos);
+        
+    }
+    
+    /**
+     * Remove an operator from the logical plan.  This call will take care
+     * of disconnecting the operator, connecting the predecessor(s) and 
+     * successor(s) and patching up the plan. 
+     * @param op operator to be removed.
+     * @throws IOException If the operator is not in the plan.
+     */
+    public void removeLogical(LogicalRelationalOperator op) throws IOException {
+        
+        checkIn(op);
+        List<Operator> pred = getPredecessors(op);
+        List<Operator> succ = getSuccessors(op);
+        int predSz = pred.size();
+        int succSz = succ.size();
+        if (predSz > 1 && succSz > 1) {
+            // Don't have a clue what to do here.  We shouldn't have any
+            // operators that have multiple inputs and multiple outputs.
+            throw new IOException("Attempt to remove a node with multiple "
+                + "inputs and outputs!");
+        }
+        
+        // Disconnect and remove the given node.
+        for (Operator p : pred) {
+            disconnect(p, op);
+        }
+        for (Operator s : succ) {
+            disconnect(op, s);
+        }
+        remove(op);
+        
+        // Now reconnect the before and after
+        if (predSz > 1 && succSz == 1) {
+            for (Operator p : pred) {
+                connect(p, succ.get(0));
+            }
+        } else if (predSz == 1 && succSz >= 1) {
+            for (Operator s : succ) {
+                connect(pred.get(0), s);
+            }
+        }
+        
+    }
+    
+    private void doAdd(LogicalRelationalOperator before,
+                       LogicalRelationalOperator newOper,
+                       LogicalRelationalOperator after) throws IOException {
+        if (before != null) checkIn(before);
+        if (after != null) checkIn(after);
+        checkNotIn(newOper);
+        
+        add(newOper);
+        if (before != null) connect(before, newOper);
+        if (after != null) connect(newOper, after);
+    }
+    
+    private void checkIn(LogicalRelationalOperator op) throws IOException {
+        if (!ops.contains(op)) {
+            throw new IOException("Attempt to use operator " + op.getName() + 
+                " which is not in the plan.");
+        }
+    }
+    
+     private void checkNotIn(LogicalRelationalOperator op) throws IOException {
+        if (ops.contains(op)) {
+            throw new IOException("Attempt to add operator " + op.getName() + 
+                " which is already in the plan.");
+        }
+    }
+        
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.experimental.plan.PlanWalker;
+
+/**
+ * A visitor for logical plans.
+ */
+public abstract class LogicalPlanVisitor extends PlanVisitor {
+
+    protected LogicalPlanVisitor(OperatorPlan plan, PlanWalker walker) {
+        super(plan, walker);
+        
+        if (!(plan instanceof LogicalPlan)) {
+            throw new RuntimeException(
+                "LogicalPlanVisitor expects to visit logical plans");
+        }
+    }
+    
+    protected void visitLOLoad(LOLoad load) {
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+
+/**
+ * Logical representation of relational operators.  Relational operators have
+ * a schema.
+ */
+abstract public class LogicalRelationalOperator extends Operator {
+    
+    protected LogicalSchema schema;
+    int requestedParallelism;
+
+    /**
+     * 
+     * @param name of this operator
+     * @param plan this operator is in
+     */
+    public LogicalRelationalOperator(String name, OperatorPlan plan) {
+        this(name, plan, -1);
+    }
+    
+    /**
+     * 
+     * @param name of this operator
+     * @param plan this operator is in
+     * @param rp requested parallelism
+     */
+    public LogicalRelationalOperator(String name,
+                                     OperatorPlan plan,
+                                     int rp) {
+        super(name, plan);
+        requestedParallelism = rp;
+    }
+    
+
+    public int getRequestedParallelisam() {
+        return requestedParallelism;
+    } 
+
+    /**
+     * Get the schema for the output of this relational operator.
+     * @return the schema
+     */
+    abstract public LogicalSchema getSchema();
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.data.DataType;
+
+/**
+ * Schema, from a logical perspective.
+ */
+public class LogicalSchema {
+
+    public static class LogicalFieldSchema {
+        public String alias;
+        public DataType type;
+        public LogicalSchema schema;
+    }
+    
+    private List<LogicalFieldSchema> fields;
+    private Map<String, Integer> aliases;
+    
+    public LogicalSchema() {
+        fields = new ArrayList<LogicalFieldSchema>();
+        aliases = new HashMap<String, Integer>();
+    }
+    
+    public void addField(LogicalFieldSchema field) {
+        fields.add(field);
+        if (field.alias != null && field.alias.equals("")) {
+            aliases.put(field.alias, fields.size() - 1);
+        }
+    }
+    
+    public LogicalFieldSchema getField(String alias) {
+        return null;
+    }
+
+    public LogicalFieldSchema getField(int fieldNum) {
+        return fields.get(fieldNum);
+    }
+
+    public Integer size() {
+       return null;
+    }
+    
+    public static LogicalSchema merge(LogicalSchema s1, LogicalSchema s2) {
+        // TODO
+        return null;
+    }
+    
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.util.Pair;
+
+public abstract class BaseOperatorPlan implements OperatorPlan {
+
+    protected Set<Operator> ops;
+    protected PlanEdge fromEdges;
+    protected PlanEdge toEdges;
+
+    private List<Operator> roots;
+    private List<Operator> leaves;
+    protected static final Log log =
+        LogFactory.getLog(BaseOperatorPlan.class);
+ 
+    public BaseOperatorPlan() {
+        ops = new HashSet<Operator>();
+        roots = new ArrayList<Operator>();
+        leaves = new ArrayList<Operator>();
+        fromEdges = new PlanEdge();
+        toEdges = new PlanEdge();
+    }
+    
+    /**
+     * Get number of nodes in the plan.
+     */
+    public int size() {
+        return ops.size();
+    }
+
+    /**
+     * Get all operators in the plan that have no predecessors.
+     * @return all operators in the plan that have no predecessors, or
+     * an empty list if the plan is empty.
+     */
+    public List<Operator> getRoots() {
+        if (roots.size() == 0 && ops.size() > 0) {
+            for (Operator op : ops) {
+                if (toEdges.get(op) == null) {
+                    roots.add(op);
+                }
+            }
+        }
+        return roots;
+    }
+
+    /**
+     * Get all operators in the plan that have no successors.
+     * @return all operators in the plan that have no successors, or
+     * an empty list if the plan is empty.
+     */
+    public List<Operator> getLeaves() {
+        if (leaves.size() == 0 && ops.size() > 0) {
+            for (Operator op : ops) {
+                if (fromEdges.get(op) == null) {
+                    leaves.add(op);
+                }
+            }
+        }
+        return leaves;
+    }
+
+    /**
+     * For a given operator, get all operators immediately before it in the
+     * plan.
+     * @param op operator to fetch predecessors of
+     * @return list of all operators imeediately before op, or an empty list
+     * if op is a root.
+     * @throws IOException if op is not in the plan.
+     */
+    public List<Operator> getPredecessors(Operator op) throws IOException {
+        return (List<Operator>)toEdges.get(op);
+    }
+    
+    /**
+     * For a given operator, get all operators immediately after it.
+     * @param op operator to fetch successors of
+     * @return list of all operators imeediately after op, or an empty list
+     * if op is a leaf.
+     * @throws IOException if op is not in the plan.
+     */
+    public List<Operator> getSuccessors(Operator op) throws IOException {
+        return (List<Operator>)fromEdges.get(op);
+    }
+
+    /**
+     * Add a new operator to the plan.  It will not be connected to any
+     * existing operators.
+     * @param op operator to add
+     */
+    public void add(Operator op) {
+        markDirty();
+        ops.add(op);
+    }
+
+    /**
+     * Remove an operator from the plan.
+     * @param op Operator to be removed
+     * @throws IOException if the remove operation attempts to 
+     * remove an operator that is still connected to other operators.
+     */
+    public void remove(Operator op) throws IOException {
+        
+        if (fromEdges.containsKey(op) || toEdges.containsKey(op)) {
+            throw new IOException("Attempt to remove operator " + op.getName()
+                    + " that is still connected in the plan");
+        }
+        markDirty();
+        ops.remove(op);
+    }
+    
+    /**
+     * Connect two operators in the plan, controlling which position in the
+     * edge lists that the from and to edges are placed.
+     * @param from Operator edge will come from
+     * @param fromPos Position in the array for the from edge
+     * @param to Operator edge will go to
+     * @param toPos Position in the array for the to edge
+     */
+    public void connect(Operator from,
+                        int fromPos,
+                        Operator to,
+                        int toPos) {
+        markDirty();
+        fromEdges.put(from, to, fromPos);
+        toEdges.put(to, from, toPos);
+    }
+    
+    /**
+     * Connect two operators in the plan.
+     * @param from Operator edge will come from
+     * @param to Operator edge will go to
+     */
+    public void connect(Operator from, Operator to) {
+        markDirty();
+        fromEdges.put(from, to);
+        toEdges.put(to, from);
+    }
+    
+    /**
+     * Disconnect two operators in the plan.
+     * @param from Operator edge is coming from
+     * @param to Operator edge is going to
+     * @return pair of positions, indicating the position in the from and
+     * to arrays.
+     * @throws IOException if the two operators aren't connected.
+     */
+    public Pair<Integer, Integer> disconnect(Operator from,
+                                             Operator to) throws IOException {
+        Pair<Operator, Integer> f = fromEdges.removeWithPosition(from, to);
+        if (f == null) { 
+            throw new IOException("Attempt to disconnect operators " + 
+                from.getName() + " and " + to.getName() +
+                " which are not connected.");
+        }
+        
+        Pair<Operator, Integer> t = toEdges.removeWithPosition(to, from);
+        if (t == null) { 
+            throw new IOException("Plan in inconssistent state " + 
+                from.getName() + " and " + to.getName() +
+                " connected in fromEdges but not toEdges.");
+        }
+        
+        markDirty();
+        return new Pair<Integer, Integer>(f.second, t.second);
+    }
+
+    private void markDirty() {
+        roots.clear();
+        leaves.clear();
+    }
+
+    public Iterator<Operator> getOperators() {
+        return ops.iterator();
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.VisitorException;
+
+public class DependencyOrderWalker extends PlanWalker {
+
+    /**
+     * @param plan for this walker to traverse.
+     */
+    public DependencyOrderWalker(OperatorPlan plan) {
+        super(plan);
+    }
+
+    @Override
+    public PlanWalker spawnChildWalker(OperatorPlan plan) {
+        return new DependencyOrderWalker(plan);
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.
+     * @throws VisitorException if an error is encountered while walking.
+     */
+    @Override
+    public void walk(PlanVisitor visitor) throws VisitorException {
+        // This is highly inefficient, but our graphs are small so it should be okay.
+        // The algorithm works by starting at any node in the graph, finding it's
+        // predecessors and calling itself for each of those predecessors.  When it
+        // finds a node that has no unfinished predecessors it puts that node in the
+        // list.  It then unwinds itself putting each of the other nodes in the list.
+        // It keeps track of what nodes it's seen as it goes so it doesn't put any
+        // nodes in the graph twice.
+
+        List<Operator> fifo = new ArrayList<Operator>();
+        Set<Operator> seen = new HashSet<Operator>();
+        List<Operator> leaves = plan.getLeaves();
+        if (leaves == null) return;
+        for (Operator op : leaves) {
+            doAllPredecessors(op, seen, fifo);
+        }
+
+        for (Operator op: fifo) {
+            op.accept(visitor);
+        }
+    }
+
+    protected void doAllPredecessors(Operator node,
+                                   Set<Operator> seen,
+                                   Collection<Operator> fifo) throws VisitorException {
+        if (!seen.contains(node)) {
+            // We haven't seen this one before.
+            try {
+                Collection<Operator> preds = plan.getPredecessors(node);
+                if (preds != null && preds.size() > 0) {
+                    // Do all our predecessors before ourself
+                    for (Operator op : preds) {
+                        doAllPredecessors(op, seen, fifo);
+                    }
+                }
+            }catch(IOException e) {
+                throw new VisitorException(e);
+            }
+            // Now do ourself
+            seen.add(node);
+            fifo.add(node);
+            
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DepthFirstWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DepthFirstWalker.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DepthFirstWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/DepthFirstWalker.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.VisitorException;
+
+public class DepthFirstWalker extends PlanWalker {
+
+    public DepthFirstWalker(OperatorPlan plan) {
+        super(plan);
+    }
+
+    @Override
+    public PlanWalker spawnChildWalker(OperatorPlan plan) {
+        return new DepthFirstWalker(plan);
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.
+     * @throws IOException if an error is encountered while walking.
+     */
+    @Override
+    public void walk(PlanVisitor visitor) throws VisitorException {
+        List<Operator> roots = plan.getRoots();
+        Set<Operator> seen = new HashSet<Operator>();
+
+        depthFirst(null, roots, seen, visitor);
+    }
+
+    private void depthFirst(Operator node,
+                            Collection<Operator> successors,
+                            Set<Operator> seen,
+                            PlanVisitor visitor) throws VisitorException {
+        if (successors == null) return;
+
+        for (Operator suc : successors) {
+            if (seen.add(suc)) {
+                suc.accept(visitor);
+                try {
+                    Collection<Operator> newSuccessors = plan.getSuccessors(suc);
+                    depthFirst(suc, newSuccessors, seen, visitor);
+                }catch(IOException e) {
+                    throw new VisitorException(e);
+                }
+            }
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/Operator.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+public abstract class Operator {
+    
+    protected String name;
+    protected OperatorPlan plan; // plan that contains this operator
+
+    public Operator(String n, OperatorPlan p) {
+        name = n;
+        plan = p;
+    }
+
+    /**
+     * Accept a visitor at this node in the graph.
+     * @param v Visitor to accept.
+     */
+    public abstract void accept(PlanVisitor v);
+
+    public String getName() {
+        return name;
+    }
+    
+    /**
+     * Get the plan associated with this operator.
+     * @return plan
+     */
+    public OperatorPlan getPlan() {
+        return plan;
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/OperatorPlan.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.util.Pair;
+
+public interface OperatorPlan {
+    
+    /**
+     * Get number of nodes in the plan.
+     */
+    public int size();
+
+    /**
+     * Get all operators in the plan that have no predecessors.
+     * @return all operators in the plan that have no predecessors, or
+     * an empty list if the plan is empty.
+     */
+    public List<Operator> getRoots();
+
+    /**
+     * Get all operators in the plan that have no successors.
+     * @return all operators in the plan that have no successors, or
+     * an empty list if the plan is empty.
+     */
+    public List<Operator> getLeaves();
+
+    /**
+     * For a given operator, get all operators immediately before it in the
+     * plan.
+     * @param op operator to fetch predecessors of
+     * @return list of all operators imeediately before op, or an empty list
+     * if op is a root.
+     * @throws IOException if op is not in the plan.
+     */
+    public List<Operator> getPredecessors(Operator op) throws IOException;
+    
+    /**
+     * For a given operator, get all operators immediately after it.
+     * @param op operator to fetch successors of
+     * @return list of all operators imeediately after op, or an empty list
+     * if op is a leaf.
+     * @throws IOException if op is not in the plan.
+     */
+    public List<Operator> getSuccessors(Operator op) throws IOException;
+
+    /**
+     * Add a new operator to the plan.  It will not be connected to any
+     * existing operators.
+     * @param op operator to add
+     */
+    public void add(Operator op);
+
+    /**
+     * Remove an operator from the plan.
+     * @param op Operator to be removed
+     * @throws IOException if the remove operation attempts to 
+     * remove an operator that is still connected to other operators.
+     */
+    public void remove(Operator op) throws IOException;
+    
+    /**
+     * Connect two operators in the plan, controlling which position in the
+     * edge lists that the from and to edges are placed.
+     * @param from Operator edge will come from
+     * @param fromPos Position in the array for the from edge
+     * @param to Operator edge will go to
+     * @param toPos Position in the array for the to edge
+     */
+    public void connect(Operator from, int fromPos, Operator to, int toPos);
+    
+    /**
+     * Connect two operators in the plan.
+     * @param from Operator edge will come from
+     * @param to Operator edge will go to
+     */
+    public void connect(Operator from, Operator to);
+    
+    /**
+     * Disconnect two operators in the plan.
+     * @param from Operator edge is coming from
+     * @param to Operator edge is going to
+     * @return pair of positions, indicating the position in the from and
+     * to arrays.
+     * @throws IOException if the two operators aren't connected.
+     */
+    public Pair<Integer, Integer> disconnect(Operator from, Operator to) throws IOException;
+
+
+    /**
+     * Get an iterator of all operators in this plan
+     * @return an iterator of all operators in this plan
+     */
+    public Iterator<Operator> getOperators();
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanEdge.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanEdge.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanEdge.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanEdge.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
+
+public class PlanEdge extends MultiMap<Operator, Operator> {
+    
+    private static final long serialVersionUID = 1L;
+
+    public PlanEdge() {
+        super();
+    }
+
+    /**
+     * @param size Initial size of the map
+     */
+    public PlanEdge(int size) {
+        super(size);
+    }
+
+    /**
+     * Add an element to the map.
+     * @param key The key to store the value under.  If the key already
+     * exists the value will be added to the collection for that key, it
+     * will not replace the existing value (as in a standard map).
+     * @param value value to store.
+     * @param pos position in the arraylist to store the new value at.
+     * Positions are zero based.
+     */
+    public void put(Operator key, Operator value, int pos) {
+        ArrayList<Operator> list = mMap.get(key);
+        if (list == null) {
+            list = new ArrayList<Operator>();
+            if (pos != 0) {
+                throw new IndexOutOfBoundsException(
+                    "First edge cannot have position greater than 1");
+            }
+            list.add(value);
+            mMap.put(key, list);
+        } else {
+            list.add(pos, value);
+        }
+    }
+
+    /**
+     * Remove one value from an existing key and return which position in
+     * the arraylist the value was at..  If that is the last value
+     * for the key, then remove the key too.
+     * @param key Key to remove the value from.
+     * @param value Value to remove.
+     * @return A pair containing the value being removed and an integer
+     * indicating the position, or null if the key or value does
+     * not exist.  Positions are zero based.
+     */
+    public Pair<Operator, Integer> removeWithPosition(Operator key,
+                                                      Operator value) {
+        ArrayList<Operator> list = mMap.get(key);
+        if (list == null) return null;
+
+        int index = -1;
+        Iterator<Operator> i = list.iterator();
+        Operator keeper = null;
+        for (int j = 0; i.hasNext(); j++) {
+            keeper = i.next();
+            if (keeper.equals(value)) {
+                i.remove();
+                index = j;
+                break;
+            }
+        }
+        
+        if (index == -1) return null;
+
+        if (list.size() == 0) {
+            mMap.remove(key);
+        }
+
+        return new Pair<Operator, Integer>(keeper, index);
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanVisitor.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanVisitor.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import java.util.Stack;
+
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor mechanism for navigating and operating on a plan of 
+ * Operators.  This class contains the logic to traverse the plan.  It does
+ * not visit individual nodes.  That is left to implementing classes
+ * (such as LOVisitor).
+ */
+public abstract class PlanVisitor {
+
+    protected OperatorPlan plan;
+
+    /**
+     * Guaranteed to always point to the walker currently being used.
+     */
+    protected PlanWalker currentWalker;
+
+    private Stack<PlanWalker> walkers;
+
+    /**
+     * Entry point for visiting the plan.
+     * @throws VisitorException if an error is encountered while visiting.
+     */
+    public void visit() throws VisitorException {
+        currentWalker.walk(this);
+    }
+
+    public OperatorPlan getPlan() {
+        return plan;
+    }
+
+    /**
+     * @param p OperatorPlan this visitor will visit.
+     * @param walker PlanWalker this visitor will use to traverse the plan.
+     */
+    protected PlanVisitor(OperatorPlan p, PlanWalker walker) {
+        plan = p;
+        currentWalker = walker;
+        walkers = new Stack<PlanWalker>();
+    }
+
+    /**
+     * Push the current walker onto the stack of saved walkers and begin using
+     * the newly passed walker as the current walker.
+     * @param walker new walker to set as the current walker.
+     */
+    protected void pushWalker(PlanWalker walker) {
+        walkers.push(currentWalker);
+        currentWalker = walker;
+    }
+
+    /**
+     * Pop the next to previous walker off of the stack and set it as the current
+     * walker.  This will drop the reference to the current walker.
+     * @throws VisitorException if there are no more walkers on the stack.  In
+     * this case the current walker is not reset.
+     */
+    protected void popWalker() throws VisitorException {
+        if (walkers.empty()) {
+            throw new VisitorException("No more walkers to pop.");
+        }
+        currentWalker = walkers.pop();
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanWalker.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/PlanWalker.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import org.apache.pig.impl.plan.VisitorException;
+
+public abstract class PlanWalker {
+
+    protected OperatorPlan plan;
+
+    /**
+     * @param p Plan for this walker to traverse.
+     */
+    public PlanWalker(OperatorPlan p) {
+        plan = p;
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.  This can't be set in
+     * the constructor because the visitor is constructing this class, and does
+     * not yet have a 'this' pointer to send as an argument.
+     * @throws VisitorException if an error is encountered while walking.
+     */
+    public abstract void walk(PlanVisitor visitor) throws VisitorException;
+
+    /**
+     * Return a new instance of this same type of walker for a subplan.
+     * When this method is called the same type of walker with the
+     * provided plan set as the plan, must be returned.  This can then be
+     * used to walk subplans.  This allows abstract visitors to clone
+     * walkers without knowning the type of walker their subclasses used.
+     * @param plan Plan for the new walker.
+     * @return Instance of the same type of walker with plan set to plan.
+     */
+    public abstract PlanWalker spawnChildWalker(OperatorPlan plan);
+
+    public OperatorPlan getPlan() {
+        return plan ;
+    }
+    
+    public void setPlan(OperatorPlan p) {
+        plan = p;
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/ReverseDependencyOrderWalker.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.VisitorException;
+
+public class ReverseDependencyOrderWalker extends PlanWalker {
+
+    public ReverseDependencyOrderWalker(OperatorPlan plan) {
+        super(plan);
+    }
+
+    @Override
+    public PlanWalker spawnChildWalker(OperatorPlan plan) {
+        return new ReverseDependencyOrderWalker(plan);
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.
+     * @throws VisitorException if an error is encountered while walking.
+     */
+    @Override
+    public void walk(PlanVisitor visitor) throws VisitorException {
+        // This is highly inefficient, but our graphs are small so it should be okay.
+        // The algorithm works by starting at any node in the graph, finding it's
+        // successors and calling itself for each of those successors.  When it
+        // finds a node that has no unfinished successors it puts that node in the
+        // list.  It then unwinds itself putting each of the other nodes in the list.
+        // It keeps track of what nodes it's seen as it goes so it doesn't put any
+        // nodes in the graph twice.
+
+        List<Operator> fifo = new ArrayList<Operator>();
+        Set<Operator> seen = new HashSet<Operator>();
+        List<Operator> roots = plan.getRoots();
+        if (roots == null) return;
+        for (Operator op : roots) {
+            doAllSuccessors(op, seen, fifo);
+        }
+
+        for (Operator op: fifo) {
+            op.accept(visitor);
+        }
+    }
+
+    protected void doAllSuccessors(Operator node,
+                                   Set<Operator> seen,
+                                   Collection<Operator> fifo) throws VisitorException {
+        if (!seen.contains(node)) {
+            // We haven't seen this one before.
+            try {
+                Collection<Operator> succs = plan.getSuccessors(node);
+                if (succs != null && succs.size() > 0) {
+                    // Do all our successors before ourself
+                    for (Operator op : succs) {
+                        doAllSuccessors(op, seen, fifo);
+                    }
+                }
+            }catch(IOException e) {
+                throw new VisitorException(e);
+            }
+            // Now do ourself
+            seen.add(node);
+            fifo.add(node);
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanOptimizer.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan.optimizer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.experimental.plan.OperatorPlan;
+
+/**
+ * The core class of the optimizer.  The basic design of this class is that it
+ * is provided a list of RuleSets.  RuleSets represent all of the optimizer
+ * rules that can be run together.  The rules in the RuleSet will be run
+ * repeatedly until either no rule in the RuleSet passes check and calls
+ * transform or until maxIter iterations (default 500) has been made over
+ * the RuleSet.  Then the next RuleSet will be moved to.  Once finished,
+ * a given RuleSet is never returned to.
+ * 
+ * Each rule is has two parts:  a pattern and and associated transformer.
+ * Transformers have two important functions:   check(), and transform().
+ * The pattern describes a pattern of node types that the optimizer will
+ * look ot match.  If that match is found anywhere in the plan, then check()
+ * will be called.  check() allows the rule to look more in depth at the 
+ * matched pattern and decide whether the rule should be run or not.  For
+ * example, one might design a rule to push filters above join that would
+ * look for the pattern filter(join) (meaning a filter followed by a join).
+ * But only certain types of filters can be pushed.  The check() function 
+ * would need to decide whether the filter that it found was pushable or not.
+ * If check() returns true, the rule is said to have matched, and transform()
+ * is then called.  This function is responsible for making changes in the
+ * logical plan.  Once transform is complete PlanPatcher.patchUp will be
+ * called to do any necessary cleanup in the plan, such as resetting 
+ * schemas, etc.
+ */
+public abstract class PlanOptimizer {
+ 
+    protected List<Set<Rule>> ruleSets;
+    protected OperatorPlan plan;
+    protected List<PlanTransformListener> listeners;
+    protected int maxIter;
+    
+    static final int defaultIterations = 500;
+
+    /**
+     * @param p Plan to optimize
+     * @param rs List of RuleSets to use to optimize
+     * @param iterations maximum number of optimization iterations,
+     * set to -1 for default
+     */
+    protected PlanOptimizer(OperatorPlan p,
+                            List<Set<Rule>> rs,                            
+                            int iterations) {
+        plan = p;
+        ruleSets = rs;
+        listeners = new ArrayList<PlanTransformListener>();
+        maxIter = (iterations < 1 ? defaultIterations : iterations);
+    }
+    
+    public void addPlanTransformListener(PlanTransformListener listener) {
+        listeners.add(listener);
+    }
+    
+    /**
+     * Run the optimizer.  This method attempts to match each of the Rules
+     * against the plan.  If a Rule matches, it then calls the check
+     * method of the associated Transformer to give the it a chance to
+     * check whether it really wants to do the optimization.  If that
+     * returns true as well, then Transformer.transform is called. 
+     * @throws OptimizerException
+     */
+    public void optimize() throws IOException {
+        for (Set<Rule> rs : ruleSets) {
+            boolean sawMatch = false;
+            int numIterations = 0;
+            do {
+                sawMatch = false;
+                for (Rule rule : rs) {
+                    List<OperatorPlan> matches = rule.match(plan);
+                    if (matches != null) {
+                        Transformer transformer = rule.getNewTransformer();
+                        for (OperatorPlan m : matches) {
+                            if (transformer.check(m)) {
+                                sawMatch = true;
+                                transformer.transform(m);
+                                for(PlanTransformListener l: listeners) {
+                                    l.transformed(plan, transformer);
+                                }
+                            }
+                        }
+                    }
+                }
+            } while(sawMatch && ++numIterations < maxIter);
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/PlanTransformListener.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan.optimizer;
+
+import org.apache.pig.experimental.plan.OperatorPlan;
+
+/**
+ * A listener class that patches up plans after they have been transformed.
+ */
+public interface PlanTransformListener {
+    /**
+     * the listener that is notified after a plan is transformed
+     * @param plan  the plan that is transformed
+     * @param transformer the transformer that transforms this plan
+     */
+    public void transformed(OperatorPlan plan, Transformer transformer);
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Rule.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Rule.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Rule.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Rule.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan.optimizer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.experimental.plan.BaseOperatorPlan;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * Rules describe a pattern of operators.  They also reference a Transformer.
+ * If the pattern of operators is found one or more times in the provided plan,
+ * then the optimizer will use the associated Transformer to transform the
+ * plan.
+ *
+ * The syntax for rules is  (x(y, z)) where x is a base node, and y and z are
+ * node that precede x.  So the graph for the Pig Latin script:
+ * A = load;
+ * B = load;
+ * C = join A, B;
+ * D = filter C
+ * would be:  filter(join(load, load));
+ * 
+ * Rules with multiple end points (leaves) are expressed as (x(), y()) where
+ * both x and y are leaves.
+ * 
+ * It is expected that the name given to each node in the pattern exactly
+ * matches the name of the class of the node in the Plan to be matched.  So
+ * to build a rule that matched a join followed by a filter in the logical
+ * plan, the pattern would be LOFilter(LOJoin).
+ */
+public abstract class Rule {
+
+    protected String name = null;
+    protected OperatorPlan pattern;
+    transient protected OperatorPlan currentPlan;
+    protected static final Log log = LogFactory.getLog(Rule.class);
+    private transient Set<Operator> matchedNodes = new HashSet<Operator>();
+    
+    /**
+     * Create this rule by using the default pattern that this rule provided
+     * @param n Name of this rule
+     */
+    public Rule(String n) {
+        name = n;    
+        pattern = buildPattern();
+    }
+    
+    /**
+     * @param n Name of this rule
+     * @param p Pattern to look for.
+     */
+    public Rule(String n, OperatorPlan p) {
+        name = n;    
+        pattern = p;
+    }
+
+    /**
+     * Build the pattern that this rule will look for
+     * @return  the pattern to look for by this rule
+     */
+    abstract protected OperatorPlan buildPattern();
+    
+    /**
+     * Get the transformer for this rule.  Abstract because the rule
+     * may want to choose how to instantiate the transformer.  
+     * This should never return a cached transformer, it should
+     * always return a fresh one with no state.
+     * @return Transformer to use with this rule
+     */
+    abstract public Transformer getNewTransformer(); 
+    
+    /**
+     * Return the pattern to be matched for this rule
+     * @return the pattern to be matched for this rule
+     */
+    public OperatorPlan getPattern() {
+        return pattern;
+    }
+    
+    /**
+     * Search for all the sub-plans that matches the pattern
+     * defined by this rule. 
+     * @return A list of all matched sub-plans. The returned plans are
+     *        partial views of the original OperatorPlan. Each is a 
+     *        sub-set of the original plan and represents the same
+     *        topology as the pattern, but operators in the returned plan  
+     *        are the same objects as the original plan. Therefore, 
+     *        a call getPlan() from any node in the return plan would 
+     *        return the original plan.
+     *        
+     * @param plan the OperatorPlan to look for matches to the pattern
+     */
+    public List<OperatorPlan> match(OperatorPlan plan) {
+        currentPlan = plan;
+        
+        List<Operator> leaves = pattern.getLeaves();
+        
+        Iterator<Operator> iter = plan.getOperators();
+        List<OperatorPlan> matchedList = new ArrayList<OperatorPlan>();       
+        matchedNodes.clear();
+       
+        while(iter.hasNext()) {
+            Operator op = iter.next();
+           
+            // find a node that matches the first leaf of the pattern
+            if (match(op, leaves.get(0))) {
+                List<Operator> planOps = new ArrayList<Operator>();
+                planOps.add(op);
+                                
+                // if there is more than 1 leaves in the pattern, we check 
+                // if other leaves match the siblings of this node
+                if (leaves.size()>1) {
+                    boolean matched = true;
+                    
+                    
+                    List<Operator> preds = null;
+                    try {
+                        preds = plan.getPredecessors(op);
+                    }catch(IOException e) {
+                        // not going to happen
+                    }
+                    
+                    // if this node has no predecessor, it must be a root
+                    if (preds == null) {
+                        preds = new ArrayList<Operator>();
+                        preds.add(null);
+                    }
+                    
+                    for(Operator s: preds) {
+                        matched = true;
+                        List<Operator> siblings = null;
+                        try {
+                            if (s != null) {
+                                siblings = plan.getSuccessors(s);
+                            }else{
+                                // for a root, we get its siblings by getting all roots
+                                siblings = plan.getRoots();
+                            }
+                        }catch(IOException e) {
+                            // not going to happen
+			    throw new RuntimeException(e);
+                        }
+                        int index = siblings.indexOf(op);
+                        if (siblings.size()-index < leaves.size()) {
+                            continue;
+                        }
+                    
+                        
+                        for(int j=1; j<leaves.size(); j++) {
+                            if (!match(siblings.get(index+j), leaves.get(j))) {
+                                matched = false;
+                                break;
+                            }
+                        }     
+                        
+                        if (matched) {
+                            for(int j=1; j<leaves.size(); j++) {
+                                planOps.add(siblings.get(index+j));
+                                break;
+                            }
+                        }
+                    
+                    }
+                   
+                    // we have move on to next operator as this one doesn't have siblings to
+                    // match all the leaves
+                    if (!matched) {
+                        continue;
+                    }
+                }
+                
+              
+                PatternMatchOperatorPlan match = new PatternMatchOperatorPlan(plan);
+                try {
+                    if (match.check(planOps)) {
+                        // we find a matched pattern,
+                        // add the operators into matchedNodes
+                        Iterator<Operator> iter2 = match.getOperators();                      
+                        while(iter2.hasNext()) {
+                            Operator opt = iter2.next();
+                            matchedNodes.add(opt);                        
+                        }
+                        
+                        // add pattern
+                        matchedList.add(match);                                                
+                    }
+                }catch(IOException e) {
+                    log.error("Failed to search for optmization pattern. ", e);
+                }
+            }
+        }
+        
+        return matchedList;
+    }     
+
+    public String getName() {
+        return name;
+    }
+    
+    /** 
+     * Check if two operators match each other, we want to find matches
+     * that don't share nodes
+     */
+    private boolean match(Operator planNode, Operator patternNode) {
+        return planNode.getClass().equals(patternNode.getClass()) && !matchedNodes.contains(planNode);
+    }
+    
+ 
+    private class PatternMatchOperatorPlan implements OperatorPlan {
+        OperatorPlan parent;
+        List<Operator> roots;
+        List<Operator> leaves;
+        Set<Operator> operators;
+
+        public PatternMatchOperatorPlan(OperatorPlan parent) {
+            this.parent = parent;
+            roots = new ArrayList<Operator>();
+            leaves = new ArrayList<Operator>();
+            operators = new HashSet<Operator>();
+        }    	    	
+        
+        protected boolean check(List<Operator> planOps) throws IOException {
+            List<Operator> patternOps = pattern.getLeaves();
+            if (planOps.size() != patternOps.size()) {
+                return false;
+            }
+            
+            for(int i=0; i<planOps.size(); i++) {
+                Stack<Operator> s = new Stack<Operator>();
+                if (!check(planOps.get(i), patternOps.get(i), s)) {
+                    return false;
+                }
+                operators.addAll(s);
+            }
+            
+            if (operators.size() == pattern.size()) {
+                return true;
+            }
+            
+            return false;
+        }
+        
+        /**
+         * Check if the plan operator and its sub-tree has a match to the pattern 
+         * operator and its sub-tree. This algorithm only search and return one match.
+         * It doesn't recursively search for all possible matches. For example,
+         * for a plan that looks like
+         *                   join
+         *                  /     \
+         *                 load   load
+         * if we are looking for join->load pattern, only one match will be returned instead
+         * of two, so that the matched subsets don't share nodes.
+         */ 
+        private boolean check(Operator planOp, Operator patternOp, Stack<Operator> opers) throws IOException {
+            if (!match(planOp, patternOp)) {
+                return false;
+            }
+            
+            if (pattern.getLeaves().contains(patternOp) && !leaves.contains(planOp)) {
+                leaves.add(planOp);
+            }
+            
+            // check if their predecessors match
+            List<Operator> preds1 = parent.getPredecessors(planOp);
+            List<Operator> preds2 = pattern.getPredecessors(patternOp);
+            if (preds1 == null && preds2 != null) {
+                return false;
+            }
+            
+            if (preds1 != null && preds2 != null && preds1.size() < preds2.size()) {
+                return false;
+            }
+            
+            // we've reached the root of the pattern, so a match is found
+            if (preds2 == null || preds2.size() == 0) {
+                if (!roots.contains(planOp)) {
+                    roots.add(planOp);
+                }
+                opers.push(planOp);
+                return true;
+            }
+            
+            int index = 0;
+            boolean match = true;
+            // look for predecessors 
+            while(index < preds1.size()) {
+                if (match(preds1.get(index), preds2.get(0))) {
+                    if ( (preds1.size() - index) < preds2.size()) {
+                        return false;
+                    }
+                                        
+                    for(int i=0; i<preds2.size(); i++) {
+                        if (!check(preds1.get(index+i), preds2.get(i), opers)) {
+                            for(int j=0; j<i; j++) {
+                                opers.pop();
+                            }
+                            match = false;
+                            break;
+                        }
+                    }
+                    if (match) {
+                        opers.push(planOp);
+                        return true;
+                    }
+                }
+                index++;
+            }
+            
+            return false;
+        }
+        
+        public void add(Operator op) {
+            throw new UnsupportedOperationException("add() can not be called on PatternMatchOperatorPlan");
+        }
+
+        public void connect(Operator from, int fromPos, Operator to, int toPos) {
+            throw new UnsupportedOperationException("connect() can not be called on PatternMatchOperatorPlan");
+        }
+
+        public void connect(Operator from, Operator to) {
+            throw new UnsupportedOperationException("connect() can not be called on PatternMatchOperatorPlan");
+        }
+
+        public Pair<Integer, Integer> disconnect(Operator from, Operator to) throws IOException {
+            throw new UnsupportedOperationException("disconnect() can not be called on PatternMatchOperatorPlan");
+        }
+
+        public List<Operator> getLeaves() {
+            return leaves;
+        }
+
+        public Iterator<Operator> getOperators() {
+            return operators.iterator();
+        }
+
+        public List<Operator> getPredecessors(Operator op) throws IOException {
+            List<Operator> l = parent.getPredecessors(op);
+            List<Operator> list = new ArrayList<Operator>();
+            for(Operator oper: l) {
+                if (operators.contains(oper)) {
+                    list.add(oper);
+                }
+            }
+            
+            return list;
+        }
+
+        public List<Operator> getRoots() {
+            return roots;
+        }
+
+        public List<Operator> getSuccessors(Operator op) throws IOException {
+            List<Operator> l = parent.getSuccessors(op);
+            List<Operator> list = new ArrayList<Operator>();
+            for(Operator oper: l) {
+                if (operators.contains(oper)) {
+                    list.add(oper);
+                }
+            }
+            
+            return list;
+        }
+
+        public void remove(Operator op) throws IOException {
+            throw new UnsupportedOperationException("remove() can not be called on PatternMatchOperatorPlan");
+        }
+
+        public int size() {
+            return operators.size();
+        }
+        
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Transformer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Transformer.java?rev=898497&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Transformer.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/optimizer/Transformer.java Tue Jan 12 20:32:29 2010
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan.optimizer;
+
+import java.io.IOException;
+import org.apache.pig.experimental.plan.OperatorPlan;
+
+public abstract class Transformer {
+
+    /**
+     * check if the transform should be done.  If this is being called then
+     * the pattern matches, but there may be other criteria that must be met
+     * as well.
+     * @param matched the sub-set of the plan that matches the pattern. This 
+     *        subset has the same graph as the pattern, but the operators
+     *        point to the same objects as the plan to be matched.
+     * @return true if the transform should be done.
+     * @throws IOException
+     */
+    public abstract boolean check(OperatorPlan matched) throws IOException;
+
+    /**
+     * Transform the tree
+     * @param matched the sub-set of the plan that matches the pattern. This 
+     *        subset has the same graph as the pattern, but the operators
+     *        point to the same objects as the plan to be matched.
+     * @throws IOException
+     */
+    public abstract void transform(OperatorPlan matched) throws IOException;
+
+}



Mime
View raw message