pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r672801 [1/2] - in /incubator/pig/branches/types: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/optimizer/ src/org/apache/pig/impl/logicalLa...
Date Mon, 30 Jun 2008 15:56:42 GMT
Author: gates
Date: Mon Jun 30 08:56:41 2008
New Revision: 672801

URL: http://svn.apache.org/viewvc?rev=672801&view=rev
Log:
PIG-262  checkin of optimizer patch with quite a bit of work to actually make it work.


Added:
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/CanonicalNamer.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaCalculator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/OptimizerException.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalOptimizer.java
    incubator/pig/branches/types/test/org/apache/pig/test/data/DotFiles/optplan1.dot
Modified:
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingValidator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/MultiMap.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorKey.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidationExecutor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/LogicalPlanTester.java

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Mon Jun 30 08:56:41 2008
@@ -50,7 +50,7 @@
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.impl.logicalLayer.LOPrinter;
-import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -58,6 +58,7 @@
 import org.apache.pig.impl.physicalLayer.POPrinter;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.WrappedIOException;
 
@@ -481,10 +482,18 @@
         }
 
         // run through validator
-        LogicalPlanValidationExecutor validator = 
-            new LogicalPlanValidationExecutor(lp, pigContext);
         CompilationMessageCollector collector = new CompilationMessageCollector() ;
-        validator.validate(lp, collector);
+        FrontendException caught = null;
+        try {
+            LogicalPlanValidationExecutor validator = 
+                new LogicalPlanValidationExecutor(lp, pigContext);
+            validator.validate(lp, collector);
+        } catch (FrontendException fe) {
+            // Need to go through and see what the collector has in it.  But
+            // remember what we've caught so we can wrap it into what we
+            // throw.
+            caught = fe;
+        }
         // Check to see if we had any problems.
         StringBuilder sb = new StringBuilder();
         for (CompilationMessageCollector.Message msg : collector) {
@@ -510,11 +519,13 @@
             }
         }
 
-        if (sb.length() > 0) {
-            throw new ExecException(sb.toString());
+        if (sb.length() > 0 || caught != null) {
+            throw new ExecException(sb.toString(), caught);
         }
 
-        // TODO optimize
+        // optimize
+        LogicalOptimizer optimizer = new LogicalOptimizer(lp);
+        optimizer.optimize();
 
         return lp;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Jun 30 08:56:41 2008
@@ -57,6 +57,7 @@
 import org.apache.pig.impl.mapReduceLayer.MapReduceLauncher;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.plans.PlanPrinter;
 import org.apache.pig.impl.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.shock.SSHSocketImplFactory;
@@ -273,7 +274,13 @@
     }
 
     public void explain(PhysicalPlan plan, PrintStream stream) {
-        // TODO FIX
+        try {
+            PlanPrinter printer = new PlanPrinter(plan);
+            printer.visit();
+            System.out.println();
+        } catch (VisitorException ve) {
+            throw new RuntimeException(ve);
+        }
     }
 
     public Collection<ExecJob> runningJobs(Properties properties) throws ExecException {

Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/CanonicalNamer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/CanonicalNamer.java?rev=672801&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/CanonicalNamer.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/CanonicalNamer.java Mon Jun 30 08:56:41 2008
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import org.apache.pig.impl.plan.NodeIdGenerator;
+
+/**
+ * A visitor to walk the logical plan and give canonical names fields.
+ */
+
+public class CanonicalNamer {
+
+    private static NodeIdGenerator mNid = NodeIdGenerator.getGenerator();
+
+    private static final String SCOPE = "Canonical Names";
+
+    public static String getNewName() {
+        return String.valueOf(mNid.getNextNodeId(SCOPE));
+    }
+    
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java Mon Jun 30 08:56:41 2008
@@ -98,6 +98,16 @@
         mIsFieldSchemaComputed = true;
     }
 
+    /**
+     * Unset the field schema as if it had not been calculated.  This is used
+     * by anyone who reorganizes the tree and needs to have schemas
+     * recalculated.
+     */
+    public void unsetFieldSchema() {
+        mIsFieldSchemaComputed = false;
+        mFieldSchema = null;
+    }
+
     void setFieldSchemaComputed(boolean b) {
         mIsFieldSchemaComputed = b;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCast.java Mon Jun 30 08:56:41 2008
@@ -64,10 +64,14 @@
     }
 
     @Override
-    public Schema.FieldSchema getFieldSchema() {
+    public Schema.FieldSchema getFieldSchema() throws FrontendException {
         if(!mIsFieldSchemaComputed && (null == mFieldSchema)) {
             if(DataType.isAtomic(mType)) {
                 mFieldSchema = new Schema.FieldSchema(null, mType);
+                if (mExpr.getFieldSchema() != null) {
+                    mFieldSchema.canonicalName =
+                        mExpr.getFieldSchema().canonicalName;
+                }
                 mIsFieldSchemaComputed = true;
             }
         }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java Mon Jun 30 08:56:41 2008
@@ -137,4 +137,5 @@
     public byte getType() {
         return DataType.BAG ;
     }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java Mon Jun 30 08:56:41 2008
@@ -63,7 +63,7 @@
         try {
             mStream.write(depthFirstLP().getBytes());
         } catch (IOException e) {
-            throw new VisitorException(e.getMessage());
+            throw new VisitorException(e);
         }
     }
 
@@ -73,7 +73,7 @@
     }
 
 
-    protected String depthFirstLP() throws VisitorException {
+    protected String depthFirstLP() throws VisitorException, IOException {
         StringBuilder sb = new StringBuilder();
         List<LogicalOperator> leaves = mPlan.getLeaves();
         Collections.sort(leaves);
@@ -86,7 +86,7 @@
         return sb.toString();
     }
     
-    private String planString(LogicalPlan lp){
+    private String planString(LogicalPlan lp) throws VisitorException, IOException {
         StringBuilder sb = new StringBuilder();
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         if(lp!=null)
@@ -98,7 +98,8 @@
         return sb.toString();
     }
     
-    private String planString(List<LogicalPlan> logicalPlanList){
+    private String planString(
+            List<LogicalPlan> logicalPlanList) throws VisitorException, IOException {
         StringBuilder sb = new StringBuilder();
         if(logicalPlanList!=null)
             for (LogicalPlan lp : logicalPlanList) {
@@ -107,7 +108,7 @@
         return sb.toString();
     }
 
-    private String depthFirst(LogicalOperator node) throws VisitorException {
+    private String depthFirst(LogicalOperator node) throws VisitorException, IOException {
         StringBuilder sb = new StringBuilder(node.name());
         if(node instanceof ExpressionOperator) {
             sb.append(" FieldSchema: ");
@@ -151,6 +152,10 @@
         else if(node instanceof LOSplitOutput){
             sb.append(planString(((LOSplitOutput)node).getConditionPlan()));
         }
+        else if (node instanceof LOProject) {
+            sb.append("Connected to: ");
+            sb.append(((LOProject)node).getExpression().name());
+        }
         
         List<LogicalOperator> predecessors = mPlan.getPredecessors(node);
         

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java Mon Jun 30 08:56:41 2008
@@ -117,6 +117,10 @@
         return mExp;
     }
 
+    public void setExpression(LogicalOperator exp) {
+        mExp = exp;
+    }
+
     public boolean isStar() { 
         return mIsStar;
     }
@@ -125,6 +129,10 @@
         return mProjection;
     }
 
+    public void setProjection(List<Integer> proj) {
+        mProjection = proj;
+    }
+
     public int getCol() {
         if (mProjection.size() != 1)
             
@@ -196,6 +204,8 @@
                             //the type of the operator will be unknown. when type checking is in place
                             //add the type of the operator as a parameter to the fieldschema creation
                             mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema(), DataType.TUPLE);
+                            mFieldSchema.canonicalName =
+                                CanonicalNamer.getNewName();
                             //mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema());
                         }
                     } else {
@@ -228,9 +238,13 @@
                                         mFieldSchema = s.getField(mProjection.get(0));
                                     } else {
                                         mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+                                        mFieldSchema.canonicalName =
+                                            CanonicalNamer.getNewName();
                                     }
                                 } else {
                                     mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+                                    mFieldSchema.canonicalName =
+                                        CanonicalNamer.getNewName();
                                 }
                             } else {
                                 log.debug("Input is a logical operator");
@@ -242,6 +256,8 @@
                                     log.debug("mFieldSchema schema: " + mFieldSchema.schema);
                                 } else {
                                     mFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+                                    mFieldSchema.canonicalName =
+                                        CanonicalNamer.getNewName();
                                 }
                                 mType = mFieldSchema.type ;
                             }
@@ -288,6 +304,7 @@
                     throw new FrontendException(pe.getMessage());
                 }
                 mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), new Schema(fss));
+                mFieldSchema.canonicalName = CanonicalNamer.getNewName();
                 mIsFieldSchemaComputed = true;
                 log.debug("mIsStar is false, returning computed field schema of expressionOperator");
             }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Mon Jun 30 08:56:41 2008
@@ -147,6 +147,30 @@
         this.mSchema = schema;
     }
 
+    /**
+     * Unset the schema as if it had not been calculated.  This is used by
+     * anyone who reorganizes the tree and needs to have schemas recalculated.
+     */
+    public void unsetSchema() {
+        mIsSchemaComputed = false;
+        mSchema = null;
+    }
+    
+    /**
+     * Calculate canonical names for all fields in the schema.  This should
+     * only be used for loads or other operators that create all new fields.
+     */
+    public void setCanonicalNames() {
+        for (Schema.FieldSchema fs : mSchema.getFields()) {
+            if (fs.canonicalName != null) {
+                throw new RuntimeException("Attempt to rename field " +
+                        fs.alias + " in operator " + name() + " that " +
+                    "already has canonical name "  + fs.canonicalName);
+            }
+            fs.canonicalName = CanonicalNamer.getNewName();
+        }
+    }
+
 
     /**
      * Get a copy of the schema for the output of this operator.

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java Mon Jun 30 08:56:41 2008
@@ -48,17 +48,11 @@
         return getSingleLeafPlanOutputOp().getType() ;
     }
 
-    public void explain(OutputStream out, PrintStream ps){
+    public void explain(
+            OutputStream out,
+            PrintStream ps) throws VisitorException, IOException {
         LOPrinter lpp = new LOPrinter(ps, this);
 
-        try {
-            lpp.print(out);
-        } catch (VisitorException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        } catch (IOException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
+        lpp.print(out);
     }
 }

Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=672801&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java Mon Jun 30 08:56:41 2008
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.optimizer.*;
+
+/**
+ * An optimizer for logical plans.
+ */
+public class LogicalOptimizer extends PlanOptimizer<LogicalOperator, LogicalPlan> {
+
+    public LogicalOptimizer(LogicalPlan plan) {
+        super(plan);
+
+        // List of rules for the logical optimizer
+        
+        // Add type casting to plans where the schema has been declared (by
+        // user, data, or data catalog).
+        List<String> nodes = new ArrayList<String>(1);
+        nodes.add("org.apache.pig.impl.logicalLayer.LOLoad");
+        Map<Integer, Integer> edges = new HashMap<Integer, Integer>();
+        List<Boolean> required = new ArrayList<Boolean>(1);
+        required.add(true);
+        mRules.add(new Rule(nodes, edges, required,
+            new TypeCastInserter(plan)));
+
+    }
+
+}
+
+

Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java?rev=672801&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java Mon Jun 30 08:56:41 2008
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer.optimizer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.Transformer;
+import org.apache.pig.impl.plan.optimizer.Transformer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOFilter;
+import org.apache.pig.impl.logicalLayer.LOForEach;
+import org.apache.pig.impl.logicalLayer.LOGenerate;
+import org.apache.pig.impl.logicalLayer.LOProject;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOVisitor;
+
+public abstract class LogicalTransformer extends Transformer<LogicalOperator, LogicalPlan> {
+
+    protected LogicalTransformer(
+            LogicalPlan plan,
+            PlanWalker<LogicalOperator, LogicalPlan> walker) {
+        super(plan, walker);
+    }
+
+    /**
+     * Rebuild schemas after a rule has transformed the tree.  This will first
+     * null out existing schemas and then call getSchema to rebuild them.
+     * @throws VisitorException, FrontendException
+     */
+    protected void rebuildSchemas() throws VisitorException, FrontendException {
+        SchemaRemover sr = new SchemaRemover(mPlan);
+        sr.visit();
+        SchemaCalculator sc = new SchemaCalculator(mPlan);
+        sc.visit();
+        
+    }
+
+    /**
+     * A class to visit all the projects and change them to attach to a new
+     * node.  This class overrides all of the relational operators visit
+     * methods because it does not want to visit contained plans.
+     */
+    private class ProjectFixerUpper extends LOVisitor {
+
+        private LogicalOperator mNewNode;
+        private Map<Integer, Integer> mProjectionMapping;
+
+        ProjectFixerUpper(
+                LogicalPlan plan,
+                LogicalOperator newNode,
+                Map<Integer, Integer> projectionMapping) {
+            super(plan,
+                new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
+            mNewNode = newNode;
+            mProjectionMapping = projectionMapping;
+        }
+
+        protected void visit(LOCogroup cg) throws VisitorException {
+        }
+
+        protected void visit(LOSort s) throws VisitorException {
+        }
+
+        protected void visit(LOFilter f) throws VisitorException {
+        }
+
+        protected void visit(LOSplit s) throws VisitorException {
+        }
+
+        protected void visit(LOSplitOutput s) throws VisitorException {
+        }
+
+        protected void visit(LOForEach f) throws VisitorException {
+        }
+
+        protected void visit(LOProject p) throws VisitorException {
+            // Only switch the expression if this is a top level projection,
+            // that is, this project is pointing to a relational operator
+            // outside the plan).
+            List<LogicalOperator> preds = mPlan.getPredecessors(p);
+            if (preds == null || preds.size() == 0) {
+                // Change the expression
+                p.setExpression(mNewNode);
+
+                // Remap the projection column if necessary
+                if (mProjectionMapping != null && !p.isStar()) {
+                    List<Integer> oldProjection = p.getProjection();
+                    List<Integer> newProjection =
+                        new ArrayList<Integer>(oldProjection.size());
+                    for (Integer i : oldProjection) {
+                        Integer n = mProjectionMapping.get(i);
+                        assert(n != null);
+                        newProjection.add(n);
+                    }
+                }
+            } else {
+                p.getExpression().visit(this);
+            }
+        }
+    }
+
+    /**
+     * Insert a node in between two existing nodes.  This includes inserting
+     * the node into the correct place in the plan and finding any projects in
+     * successors and reconnecting them to the new node as well as rebuilding
+     * all of the schemas.
+     * @param after Node to insert the new node after
+     * @param newnode New node to insert
+     * @param before Node to insert this node before
+     * @param projectionMapping A map that defines how projections in after
+     * relate to projections in newnode.  Keys are the projection offsets in
+     * after, values are the new offsets in newnode.  If this field is null,
+     * then it will be assumed that the mapping is 1-1.
+     * @throws VisitorException, FrontendException
+     */
+    protected void insertBetween(
+            LogicalOperator after,
+            LogicalOperator newNode,
+            LogicalOperator before,
+            Map<Integer, Integer> projectionMapping)
+            throws VisitorException, FrontendException {
+        // Insert it into the plan.
+        mPlan.add(newNode);
+        mPlan.insertBetween(after, newNode, before);
+
+        // Visit all the inner plans of before and change their projects to
+        // connect to newNode instead of after.
+        // Find right inner plan(s) to visit
+        List<LogicalPlan> plans = new ArrayList<LogicalPlan>();
+        if (before instanceof LOCogroup) {
+            plans.addAll((((LOCogroup)before).getGroupByPlans()).values());
+        } else if (before instanceof LOSort) {
+            plans.addAll(((LOSort)before).getSortColPlans());
+        } else if (before instanceof LOFilter) {
+            plans.add(((LOFilter)before).getComparisonPlan());
+        } else if (before instanceof LOSplitOutput) {
+            plans.add(((LOSplitOutput)before).getConditionPlan());
+        } else if (before instanceof LOForEach) {
+            plans.addAll(((LOForEach)before).getForEachPlans());
+        }
+        
+        for (LogicalPlan lp : plans) {
+            ProjectFixerUpper pfu =
+                new ProjectFixerUpper(lp, newNode, projectionMapping);
+            pfu.visit();
+        }
+
+        // Now rebuild the schemas
+        // rebuildSchemas();
+    }
+
+    /**
+     * Insert a node in after an existing nodes.  This includes inserting
+     * the node into the correct place in the plan and finding any projects in
+     * successors and reconnecting them to the new node as well as rebuilding
+     * all of the schemas.  This function
+     * assumes that the node has only one predecessor.
+     * @param after Node to insert the new node after
+     * @param newnode New node to insert
+     * @param projectionMapping A map that defines how projections in after
+     * relate to projections in newnode.  Keys are the projection offsets in
+     * after, values are the new offsets in newnode.  If this field is null,
+     * then it will be assumed that the mapping is 1-1.
+     * @throws VisitorException, FrontendException
+     */
+    protected void insertAfter(
+            LogicalOperator after,
+            LogicalOperator newNode,
+            Map<Integer, Integer> projectionMapping)
+            throws VisitorException, FrontendException {
+        List<LogicalOperator> successors = mPlan.getSuccessors(after);
+        if (successors.size() != 1) {
+            throw new RuntimeException("insertAfter only valid to insert " + 
+                "after a node with single output.");
+        }
+        insertBetween(after, newNode, successors.get(0), projectionMapping);
+    }
+
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaCalculator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaCalculator.java?rev=672801&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaCalculator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaCalculator.java Mon Jun 30 08:56:41 2008
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer.optimizer;
+
+import org.apache.pig.impl.logicalLayer.*;
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor to reset all the schemas in a logical plan.
+ */
+public class SchemaCalculator extends LOVisitor {
+
+    public SchemaCalculator(LogicalPlan plan) {
+        super(plan,
+            new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
+    }
+
+    /**
+     * @param binOp
+     *            the logical binary expression operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(BinaryExpressionOperator binOp)
+            throws VisitorException {
+        try {
+            binOp.getFieldSchema();
+            super.visit(binOp);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+
+    /**
+     * 
+     * @param uniOp
+     *            the logical unary operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(UnaryExpressionOperator uniOp) throws VisitorException {
+        try {
+            uniOp.getFieldSchema();
+            super.visit(uniOp);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+
+    /**
+     * 
+     * @param cg
+     *            the logical cogroup operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOCogroup cg) throws VisitorException {
+        try {
+            cg.getSchema();
+            super.visit(cg);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+
+    /**
+     * 
+     * @param s
+     *            the logical sort operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOSort s) throws VisitorException {
+        try {
+            s.getSchema();
+            super.visit(s);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+
+    /**
+     * 
+     * @param filter
+     *            the logical filter operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOFilter filter) throws VisitorException {
+        try {
+            filter.getSchema();
+            super.visit(filter);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+
+    /**
+     * 
+     * @param split
+     *            the logical split operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOSplit split) throws VisitorException {
+        try {
+            split.getSchema();
+            super.visit(split);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+
+    /**
+     * 
+     * @param forEach
+     *            the logical foreach operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOForEach forEach) throws VisitorException {
+        try {
+            super.visit(forEach);
+            forEach.getSchema();
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+
+    /**
+     * Iterate over each expression that is part of the function argument list
+     * 
+     * @param func
+     *            the user defined function
+     * @throws VisitorException
+     */
+    protected void visit(LOUserFunc func) throws VisitorException {
+        try {
+            func.getFieldSchema();
+            super.visit(func);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+
+    /**
+     * @param binCond
+     *            the logical binCond operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOBinCond binCond) throws VisitorException {
+        try {
+            binCond.getFieldSchema();
+            super.visit(binCond);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+
+    /**
+     * 
+     * @param cast
+     *            the logical cast operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOCast cast) throws VisitorException {
+        try {
+            cast.getFieldSchema();
+            super.visit(cast);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+    
+    /**
+     * 
+     * @param regexp
+     *            the logical regexp operator that has to be visited
+     * @throws ParseException
+     */
+    protected void visit(LORegexp regexp) throws VisitorException {
+        try {
+            regexp.getFieldSchema();
+            super.visit(regexp);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+
+    protected void visit(LOLoad load) throws VisitorException{
+        try {
+            load.getSchema();
+            super.visit(load);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+    
+    protected void visit(LOStore store) throws VisitorException{
+        try {
+            store.getSchema();
+            super.visit(store);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+    
+    protected void visit(LOConst c) throws VisitorException{
+        try {
+            c.getFieldSchema();
+            super.visit(c);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+
+    protected void visit(LOUnion u) throws VisitorException {
+        try {
+            u.getSchema();
+            super.visit(u);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+
+    protected void visit(LOSplitOutput sop) throws VisitorException {
+        try {
+            sop.getSchema();
+            super.visit(sop);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+
+    protected void visit(LODistinct dt) throws VisitorException {
+        try {
+            dt.getSchema();
+            super.visit(dt);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+
+    protected void visit(LOCross cs) throws VisitorException {
+        try {
+            cs.getSchema();
+            super.visit(cs);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+
+    protected void visit(LOProject project) throws VisitorException {
+        try {
+            project.getFieldSchema();
+            super.visit(project);
+        } catch (FrontendException fe) {
+            throw new VisitorException(fe);
+        }
+    }
+    
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java?rev=672801&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java Mon Jun 30 08:56:41 2008
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer.optimizer;
+
+import org.apache.pig.impl.logicalLayer.*;
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor to reset all the schemas in a logical plan.
+ */
+public class SchemaRemover extends LOVisitor {
+
+    public SchemaRemover(LogicalPlan plan) {
+        super(plan,
+            new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
+    }
+
+    /**
+     * @param binOp
+     *            the logical binary expression operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(BinaryExpressionOperator binOp)
+            throws VisitorException {
+        binOp.unsetFieldSchema();
+        super.visit(binOp);
+    }
+
+    /**
+     * 
+     * @param uniOp
+     *            the logical unary operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(UnaryExpressionOperator uniOp) throws VisitorException {
+        uniOp.unsetFieldSchema();
+        super.visit(uniOp);
+    }
+
+    /**
+     * 
+     * @param cg
+     *            the logical cogroup operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOCogroup cg) throws VisitorException {
+        cg.unsetSchema();
+        super.visit(cg);
+    }
+
+    /**
+     * 
+     * @param s
+     *            the logical sort operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOSort s) throws VisitorException {
+        s.unsetSchema();
+        super.visit(s);
+    }
+
+    /**
+     * 
+     * @param filter
+     *            the logical filter operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOFilter filter) throws VisitorException {
+        filter.unsetSchema();
+        super.visit(filter);
+    }
+
+    /**
+     * 
+     * @param split
+     *            the logical split operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOSplit split) throws VisitorException {
+        split.unsetSchema();
+        super.visit(split);
+    }
+
+    /**
+     * 
+     * @param forEach
+     *            the logical foreach operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOForEach forEach) throws VisitorException {
+        forEach.unsetSchema();
+        super.visit(forEach);
+    }
+
+    /**
+     * Iterate over each expression that is part of the function argument list
+     * 
+     * @param func
+     *            the user defined function
+     * @throws VisitorException
+     */
+    protected void visit(LOUserFunc func) throws VisitorException {
+        func.unsetFieldSchema();
+        super.visit(func);
+    }
+
+    /**
+     * @param binCond
+     *            the logical binCond operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOBinCond binCond) throws VisitorException {
+        binCond.unsetFieldSchema();
+        super.visit(binCond);
+    }
+
+    /**
+     * 
+     * @param cast
+     *            the logical cast operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(LOCast cast) throws VisitorException {
+        cast.unsetFieldSchema();
+        super.visit(cast);
+    }
+    
+    /**
+     * 
+     * @param regexp
+     *            the logical regexp operator that has to be visited
+     * @throws ParseException
+     */
+    protected void visit(LORegexp regexp) throws VisitorException {
+        regexp.unsetFieldSchema();
+        super.visit(regexp);
+    }
+
+    protected void visit(LOLoad load) throws VisitorException{
+        // Don't remove load's schema, it's not like it will change.  And we
+        // don't have a way to recover it.
+        super.visit(load);
+    }
+    
+    protected void visit(LOStore store) throws VisitorException{
+        store.unsetSchema();
+        super.visit(store);
+    }
+    
+    protected void visit(LOConst c) throws VisitorException{
+        c.unsetSchema();
+        super.visit(c);
+    }
+
+    protected void visit(LOUnion u) throws VisitorException {
+        u.unsetSchema();
+        super.visit(u);
+    }
+
+    protected void visit(LOSplitOutput sop) throws VisitorException {
+        sop.unsetSchema();
+        super.visit(sop);
+    }
+
+    protected void visit(LODistinct dt) throws VisitorException {
+        dt.unsetSchema();
+        super.visit(dt);
+    }
+
+    protected void visit(LOCross cs) throws VisitorException {
+        cs.unsetSchema();
+        super.visit(cs);
+    }
+
+    protected void visit(LOProject project) throws VisitorException {
+        project.unsetFieldSchema();
+        super.visit(project);
+    }
+    
+}

Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java?rev=672801&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java Mon Jun 30 08:56:41 2008
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.logicalLayer.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOCast;
+import org.apache.pig.impl.logicalLayer.LOForEach;
+import org.apache.pig.impl.logicalLayer.LOGenerate;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOProject;
+import org.apache.pig.impl.logicalLayer.LOVisitor;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A visitor to discover if any schema has been specified for a file being
+ * loaded.  If so, a projection will be injected into the plan to cast the
+ * data being loaded to the appropriate types.  The optimizer can then come
+ * along and move those casts as far down as possible, or in some cases remove
+ * them altogether.  This visitor does not handle finding the schemas for the 
+ * file, that has already been done as part of parsing.
+ *
+ */
+public class TypeCastInserter extends LogicalTransformer {
+
+    private static final Log log = LogFactory.getLog(TypeCastInserter.class);
+
+    public TypeCastInserter(LogicalPlan plan) {
+        super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
+    }
+
+    @Override
+    public boolean check(List<LogicalOperator> nodes) throws OptimizerException {
+        try {
+            LogicalOperator lo = nodes.get(0);
+            if (lo == null || !(lo instanceof LOLoad)) {
+                throw new RuntimeException("Expected load, got " +
+                    lo.getClass().getName());
+            }
+
+            LOLoad load = (LOLoad)lo;
+            Schema s = load.getSchema();
+            if (s == null) return false;
+    
+            boolean sawOne = false;
+            List<Schema.FieldSchema> fss = s.getFields();
+            List<Byte> types = new ArrayList<Byte>(s.size());
+            for (Schema.FieldSchema fs : fss) {
+                if (fs.type != DataType.BYTEARRAY) sawOne = true;
+                types.add(fs.type);
+            }
+
+            // If all we've found are byte arrays, we don't need a projection.
+            return sawOne;
+        } catch (FrontendException fe) {
+            throw new OptimizerException("Caught exception while trying to " +
+                " check if type casts are needed", fe);
+        }
+    }
+
+    @Override
+    public void transform(List<LogicalOperator> nodes) throws OptimizerException {
+        try {
+            LogicalOperator lo = nodes.get(0);
+            if (lo == null || !(lo instanceof LOLoad)) {
+                throw new RuntimeException("Expected load, got " +
+                    lo.getClass().getName());
+            }
+
+            LOLoad load = (LOLoad)lo;
+    
+            Schema s = load.getSchema();
+            String scope = load.getOperatorKey().scope;
+            // For every field, build a logical plan.  If the field has a type
+            // other than byte array, then the plan will be cast(project).  Else
+            // it will just be project.
+            ArrayList<LogicalPlan> genPlans = new ArrayList<LogicalPlan>(s.size());
+            ArrayList<Boolean> flattens = new ArrayList<Boolean>(s.size());
+            Map<String, Byte> typeChanges = new HashMap<String, Byte>();
+            for (int i = 0; i < s.size(); i++) {
+                LogicalPlan p = new LogicalPlan();
+                genPlans.add(p);
+                flattens.add(false);
+                List<Integer> toProject = new ArrayList<Integer>(1);
+                toProject.add(i);
+                LOProject proj = new LOProject(p, OperatorKey.genOpKey(scope),
+                    load, toProject);
+                p.add(proj);
+                Schema.FieldSchema fs = s.getField(i);
+                if (fs.type != DataType.BYTEARRAY) {
+                    LOCast cast = new LOCast(p, OperatorKey.genOpKey(scope),
+                        proj, fs.type);
+                    p.add(cast);
+                    p.connect(proj, cast);
+                    
+                    cast.setFieldSchema(fs.clone());
+                    typeChanges.put(fs.canonicalName, fs.type);
+                    // Reset the loads field schema to byte array so that it
+                    // will reflect reality.
+                    fs.type = DataType.BYTEARRAY;
+                }
+            }
+
+            // Build a foreach to insert after the load, giving it a cast for each
+            // position that has a type other than byte array.
+            LOForEach foreach = new LOForEach(mPlan,
+                OperatorKey.genOpKey(scope), genPlans, flattens);
+
+            // Insert the foreach into the plan and patch up the plan.
+            insertAfter(load, foreach, null);
+
+            rebuildSchemas();
+
+        } catch (Exception e) {
+            throw new OptimizerException(
+                "Unable to insert type casts into plan", e);
+        }
+    }
+}
+
+ 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Jun 30 08:56:41 2008
@@ -649,7 +649,7 @@
 {
 	(
 	(
-	(<LOAD> op = LoadClause(lp) [<AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" {op.setSchema(schema);log.debug("Load as schema()");schema.printAliases();} | fs = AtomSchema() {schema = new Schema(fs); op.setSchema(schema); log.debug("Load as atomschema()");schema.printAliases();}) ])
+	(<LOAD> op = LoadClause(lp) [<AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" {op.setSchema(schema); op.setCanonicalNames(); log.debug("Load as schema()");schema.printAliases();} | fs = AtomSchema() {schema = new Schema(fs); op.setSchema(schema); log.debug("Load as atomschema()");schema.printAliases();}) ])
 |	((<GROUP> | <COGROUP>) op = CogroupClause(lp))
 |	(<FILTER> op = FilterClause(lp))
 |   (<ORDER> op = OrderClause(lp))

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Mon Jun 30 08:56:41 2008
@@ -30,6 +30,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.impl.plan.MultiMap;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 
 
@@ -51,6 +52,28 @@
          * must be null.
          */
         public Schema schema;
+
+        /**
+         * Canonical name.  This name uniquely identifies a field throughout
+         * the query.  Unlike a an alias, it cannot be changed.  It will
+         * change when the field is transformed in some way (such as being
+         * used in an arithmetic expression or passed to a udf).  At that
+         * point a new canonical name will be generated for the field.
+         */
+        public String canonicalName = null;
+
+        /**
+         * Map of canonical names used for this field in other sections of the
+         * plan.  It can occur that a single field will have different
+         * canonical names in different branches of a plan.  For example, 
+         * C = cogroup A by x, B by y.  In subsequent statements, the grouping
+         * column will have canonical name, say, of 'r'.  But in branches
+         * above the cogroup it may have been known as 's' in the A branch and
+         * 't' in the B branch.  This map preserves that.  The key is a
+         * logical operator's key, and the value is the canonical name
+         * associated with the field for that operator.
+         */
+        public Map<OperatorKey, String> canonicalMap = null;
         
         private static Log log = LogFactory.getLog(Schema.FieldSchema.class);
 
@@ -257,9 +280,38 @@
                 sb.append(schema.toString());
                 sb.append(")");
             }
+
+            if (canonicalName != null) {
+                sb.append(" cn: ");
+                sb.append(canonicalName);
+            }
             return sb.toString();
         }
 
+        /**
+         * Make a deep copy of this FieldSchema and return it.
+         * @return clone of the this FieldSchema.
+         * @throws CloneNotSupportedException
+         */
+        @Override
+        public FieldSchema clone() throws CloneNotSupportedException {
+            // Strings are immutable, so we don't need to copy alias.  Schemas
+            // are mutable so we need to make a copy.
+            try {
+                FieldSchema fs = new FieldSchema(alias,
+                    (schema == null ? null : schema.clone()), type);
+                fs.canonicalName = canonicalName;
+                if (canonicalMap != null) {
+                    fs.canonicalMap =
+                        new HashMap<OperatorKey, String>(canonicalMap);
+                }
+                return fs;
+            } catch (FrontendException fe) {
+                throw new RuntimeException(
+                    "Should never fail to clone a FieldSchema", fe);
+            }
+        }
+
     }
 
     private List<FieldSchema> mFields;
@@ -424,6 +476,45 @@
 
     }
 
+    /**
+     * Make a deep copy of a schema.
+     * @throws CloneNotSupportedException
+     */
+    public Schema clone() throws CloneNotSupportedException {
+        Schema s = new Schema();
+
+        // Build a map between old and new field schemas, so we can properly
+        // construct the new alias and field schema maps.  Populate the field
+        // list with copies of the existing field schemas.
+        Map<FieldSchema, FieldSchema> fsMap =
+            new HashMap<FieldSchema, FieldSchema>(size());
+        for (FieldSchema fs : mFields) {
+            FieldSchema copy = fs.clone();
+            s.mFields.add(copy);
+            fsMap.put(fs, copy);
+        }
+
+        // Build the aliases map
+        for (String alias : mAliases.keySet()) {
+            FieldSchema oldFs = mAliases.get(alias);
+            assert(oldFs != null);
+            FieldSchema newFs = fsMap.get(oldFs);
+            assert(newFs != null);
+            s.mAliases.put(alias, newFs);
+        }
+
+        // Build the field schemas map
+        for (FieldSchema oldFs : mFieldSchemas.keySet()) {
+            FieldSchema newFs = fsMap.get(oldFs);
+            assert(newFs != null);
+            s.mFieldSchemas.put(newFs, mFieldSchemas.get(oldFs));
+        }
+
+        return s;
+    }
+
+
+
     static int[] primeList = { 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37,
                                41, 43, 47, 53, 59, 61, 67, 71, 73, 79,
                                83, 89, 97, 101, 103, 107, 109, 1133} ;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/LogicalPlanValidationExecutor.java Mon Jun 30 08:56:41 2008
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.pig.impl.logicalLayer.validators;
 
 import java.util.List; 
@@ -42,13 +60,15 @@
    
         // Default validations
         validatorList.add(new InputOutputFileValidator(pigContext)) ;
+        // This one has to be done before the type checker.
+        //validatorList.add(new TypeCastInserterValidator()) ;
         validatorList.add(new TypeCheckingValidator()) ;
         
        
     }    
 
     public void validate(LogicalPlan plan,
-            CompilationMessageCollector msgCollector) {
+            CompilationMessageCollector msgCollector) throws PlanValidationException {
         if (msgCollector == null) {
             throw new AssertionError(" messageCollector in " 
                          + "LogicalPlanValidationExecutor cannot be null") ;
@@ -60,9 +80,10 @@
             }
         } 
         catch(PlanValidationException pve) {
-            msgCollector.collect("Severe problem found during validation"
-                                 + pve.toString() ,
+            msgCollector.collect("Severe problem found during validation "
+                                 + pve.toString(),
                                  MessageType.Error) ;
+            throw pve;
         }     
     }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingValidator.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingValidator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingValidator.java Mon Jun 30 08:56:41 2008
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.pig.impl.logicalLayer.validators;
 
 import org.apache.pig.impl.logicalLayer.LogicalOperator;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/MultiMap.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/MultiMap.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/MultiMap.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/MultiMap.java Mon Jun 30 08:56:41 2008
@@ -59,6 +59,23 @@
     }
 
     /**
+     * Add a key to the map with a collection of elements.
+     * @param key The key to store the value under.  If the key already
+     * exists the value will be added to the collection for that key, it
+     * will not replace the existing value (as in a standard map).
+     * @param values collection of values to store.
+     */
+    public void put(K key, Collection<V> values) {
+        ArrayList<V> list = mMap.get(key);
+        if (list == null) {
+            list = new ArrayList<V>(values);
+            mMap.put(key, list);
+        } else {
+            list.addAll(values);
+        }
+    }
+
+    /**
      * Get the collection of values associated with a given key.
      * @param key Key to fetch values for.
      * @return collection of values, or null if the key is not in the map.
@@ -104,6 +121,26 @@
         return mMap.keySet();
     }
 
+    /**
+     * Get a single collection of all the values in the map.  All of the
+     * values in the map will be conglomerated into one collection.  There
+     * will not be any duplicate removal.
+     * @return collection of values.
+     */
+    public Collection<V> values() {
+        Set<K> keys = mMap.keySet();
+        int size = 0;
+        for (K k : keys) {
+            size += mMap.get(k).size();
+        }
+        Collection<V> values = new ArrayList<V>(size);
+        for (K k : keys) {
+            values.addAll(mMap.get(k));
+        }
+        return values;
+    }
+
+
 
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorKey.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorKey.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorKey.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorKey.java Mon Jun 30 08:56:41 2008
@@ -78,4 +78,16 @@
                 return -1;
         }
     }
+
+    /**
+     * Utility function for creating operator keys.
+     * @param scope Scope to use in creating the key.
+     * @return new operator key.
+     */
+    public static OperatorKey genOpKey(String scope) {
+        return new OperatorKey(scope,
+            NodeIdGenerator.getGenerator().getNextNodeId(scope));
+    }
+
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java Mon Jun 30 08:56:41 2008
@@ -44,7 +44,7 @@
 
     private List<E> mRoots;
     private List<E> mLeaves;
-    private Log log = LogFactory.getLog(OperatorPlan.class);
+    protected static Log log = LogFactory.getLog(OperatorPlan.class);
     
     public OperatorPlan() {
         mRoots = new ArrayList<E>();
@@ -345,5 +345,25 @@
         return mKeys.size() ;
     }
 
+    /**
+     * Given two connected nodes add another node between them.
+     * @param after Node to insert this node after
+     * @param newNode new node to insert.  This node must have already been
+     * added to the plan.
+     * @param before Node to insert this node before
+     * @throws PlanException if it encounters trouble disconecting or
+     * connecting nodes.
+     */
+    public void insertBetween(
+            E after,
+            E newNode,
+            E before) throws PlanException {
+        if (!disconnect(after, before)) {
+            throw new PlanException("Attempt to insert between two nodes " +
+                "that were not connected.");
+        }
+        connect(after, newNode);
+        connect(newNode, before);
+    }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidationExecutor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidationExecutor.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidationExecutor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanValidationExecutor.java Mon Jun 30 08:56:41 2008
@@ -22,5 +22,7 @@
  * 
  */
 public interface PlanValidationExecutor<P extends OperatorPlan> {
-   void validate(P plan, CompilationMessageCollector msgCollector) ;
+   void validate(
+        P plan,
+        CompilationMessageCollector msgCollector) throws PlanValidationException;
 }

Added: incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/OptimizerException.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/OptimizerException.java?rev=672801&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/OptimizerException.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/OptimizerException.java Mon Jun 30 08:56:41 2008
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan.optimizer;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+
+public class OptimizerException extends FrontendException {
+
+    public OptimizerException (String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public OptimizerException() {
+        this(null, null);
+    }
+    
+    public OptimizerException(String message) {
+        this(message, null);
+    }
+    
+    public OptimizerException(Throwable cause) {
+        this(null, cause);
+    }
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java Mon Jun 30 08:56:41 2008
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.impl.plan.optimizer;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.pig.impl.plan.Operator;
@@ -29,18 +30,16 @@
  *
  */
 
-public class PlanOptimizer<O extends Operator, P extends OperatorPlan<O>> {
+public abstract class PlanOptimizer<O extends Operator, P extends OperatorPlan<O>> {
     
-    private List<Rule> mRules;
-    private P mPlan;
+    protected List<Rule> mRules;
+    protected P mPlan;
 
     /**
      * @param plan Plan to optimize
-     * @param rules List of rules to attempt to apply.
      */
-    public PlanOptimizer(P plan,
-                         List<Rule> rules) {
-        mRules = rules;
+    protected PlanOptimizer(P plan) {
+        mRules = new ArrayList<Rule>();
         mPlan = plan;
     }
 
@@ -50,8 +49,9 @@
      * method of the associated Transformer to give the it a chance to
      * check whether it really wants to do the optimization.  If that
      * returns true as well, then Transformer.transform is called. 
+     * @throws OptimizerException
      */
-    public void optimize() {
+    public final void optimize() throws OptimizerException {
         RuleMatcher matcher = new RuleMatcher();
         for (Rule rule : mRules) {
             if (matcher.match(rule)) {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java Mon Jun 30 08:56:41 2008
@@ -45,9 +45,10 @@
      * @param nodes - List of nodes declared in transform ($1 = nodes[0],
      * etc.)  Remember that somes entries in node[] may be NULL since they may
      * not be created until after the transform.
-     * @return - true if the transform should be done.
+     * @return true if the transform should be done.
+     * @throws OptimizerException
      */
-    public abstract boolean check(List<O> nodes);
+    public abstract boolean check(List<O> nodes) throws OptimizerException;
 
     /**
      * Transform the tree
@@ -56,8 +57,9 @@
      * of the transform and remove them from the nodes vector and construct
      * any that are being created as part of the transform and add them at the
      * appropriate point to the nodes vector.
+     * @throws OptimizerException
      */
-    public abstract void transform(List<O> nodes);
+    public abstract void transform(List<O> nodes) throws OptimizerException;
 
 }
 

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java Mon Jun 30 08:56:41 2008
@@ -111,7 +111,9 @@
             }
         }
         ps.close();
-        pig.registerQuery("A=load 'file:" + tmpFile + "' using " + PigStorage.class.getName() + "(':');");
+        pig.registerQuery("A=load 'file:" + tmpFile + "' using " +
+                PigStorage.class.getName() +
+                "(':') as (f1: double, f2:double);");
         String query = "A = filter A by $0 > $1;";
 
         log.info(query);
@@ -165,24 +167,22 @@
         }
         ps.close();
         pig.registerQuery("A=load 'file:" + tmpFile + "';");
-        String query = "A = foreach A generate ($0 < '10'?($1 >= '5' ? '2': '1') : '0');";
+        String query = "B = foreach A generate ((int)$0 < 10?((int)$1 >= 5 ? 2: 1) : 0);";
         log.info(query);
         pig.registerQuery(query);
-        Iterator it = pig.openIterator("A");
+        Iterator it = pig.openIterator("B");
         tmpFile.delete();
         int count =0;
         while(it.hasNext()) {
             Tuple t = (Tuple)it.next();
-            Double first = Double.valueOf(t.get(0).toString());
+            Integer first = (Integer)t.get(0);
             count+=first;
-               assertTrue(first == 1 || first == 2 || first == 0);
+            assertTrue(first == 1 || first == 2 || first == 0);
             
         }
         assertEquals("expected count of 15", 15, count);
     }
     
-    
-    
     @Test 
     public void testNumericLt() throws Throwable {
         PigServer pig = new PigServer(initString);

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalOptimizer.java?rev=672801&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalOptimizer.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalOptimizer.java Mon Jun 30 08:56:41 2008
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import org.apache.pig.impl.logicalLayer.*;
+import org.apache.pig.impl.logicalLayer.optimizer.*;
+import org.apache.pig.test.utils.LogicalPlanTester;
+
+import org.junit.Test;
+import org.junit.Before;
+
+/**
+ * Test the logical optimizer.
+ */
+
+public class TestLogicalOptimizer extends junit.framework.TestCase {
+
+    final String FILE_BASE_LOCATION = "test/org/apache/pig/test/data/DotFiles/" ;
+
+    LogicalPlanTester planTester = new LogicalPlanTester() ;
+
+    @Before
+    public void setUp() {
+        planTester.reset();
+    }
+
+    @Test
+    public void testTypeCastInsertion() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (p:int, q:long, r:float, "
+            + "s:double, t:map [], u:tuple (x:int, y:int), " + 
+            "v:bag {x:tuple(z:int)});");
+        LogicalPlan plan = planTester.buildPlan("B = order A by p;");
+        planTester.typeCheckAgainstDotFile(plan, FILE_BASE_LOCATION +
+            "optplan1.dot", true);
+    }
+}
+

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java?rev=672801&r1=672800&r2=672801&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java Mon Jun 30 08:56:41 2008
@@ -191,6 +191,18 @@
         }
     }
 
+    static class TOptimizer extends PlanOptimizer<TOperator, TPlan> {
+
+        public TOptimizer(TPlan plan) {
+            super(plan);
+        }
+
+        public void addRule(Rule rule) {
+            mRules.add(rule);
+        }
+    }
+
+
     @Test
     public void testAddRemove() throws Exception {
         // Test that we can add and remove nodes from the plan.  Also test
@@ -271,6 +283,88 @@
     }
 
     @Test
+    public void testInsertBetween() throws Exception {
+        // Test that insertBetween works.
+
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[3];
+        for (int i = 0; i < 3; i++) {
+            ops[i] = new SingleOperator(Integer.toString(i));
+            plan.add(ops[i]);
+        }
+
+        // Connect 0 to 2
+        plan.connect(ops[0], ops[2]);
+
+        Collection p = plan.getPredecessors(ops[0]);
+        assertNull(p);
+        p = plan.getSuccessors(ops[0]);
+        assertEquals(1, p.size());
+        Iterator i = p.iterator();
+        assertEquals(ops[2], i.next());
+
+        p = plan.getPredecessors(ops[1]);
+        assertNull(p);
+        p = plan.getSuccessors(ops[1]);
+        assertNull(p);
+
+        p = plan.getPredecessors(ops[2]);
+        assertEquals(1, p.size());
+        i = p.iterator();
+        assertEquals(ops[0], i.next());
+        p = plan.getSuccessors(ops[2]);
+        assertNull(p);
+
+        // Insert 1 in between 0 and 2
+        plan.insertBetween(ops[0], ops[1], ops[2]);
+
+        p = plan.getPredecessors(ops[0]);
+        assertNull(p);
+        p = plan.getSuccessors(ops[0]);
+        assertEquals(1, p.size());
+        i = p.iterator();
+        assertEquals(ops[1], i.next());
+
+        p = plan.getPredecessors(ops[1]);
+        assertEquals(1, p.size());
+        i = p.iterator();
+        assertEquals(ops[0], i.next());
+        p = plan.getSuccessors(ops[1]);
+        assertEquals(1, p.size());
+        i = p.iterator();
+        assertEquals(ops[2], i.next());
+
+        p = plan.getPredecessors(ops[2]);
+        assertEquals(1, p.size());
+        i = p.iterator();
+        assertEquals(ops[1], i.next());
+        p = plan.getSuccessors(ops[2]);
+        assertNull(p);
+    }
+
+    @Test
+    public void testInsertBetweenNegative() throws Exception {
+        // Test that insertBetween throws errors when it should.
+
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[4];
+        for (int i = 0; i < 4; i++) {
+            ops[i] = new MultiOperator(Integer.toString(i));
+            plan.add(ops[i]);
+        }
+
+        plan.connect(ops[0], ops[1]);
+
+        boolean caughtIt = false;
+        try {
+            plan.insertBetween(ops[0], ops[3], ops[2]);
+        } catch (PlanException pe) {
+            caughtIt = true;
+        }
+        assertTrue(caughtIt);
+    }
+
+    @Test
     public void testLinearGraph() throws Exception {
         TPlan plan = new TPlan();
         TOperator[] ops = new TOperator[5];
@@ -483,11 +577,8 @@
         Rule<TOperator, TPlan> r =
             new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
 
-        ArrayList<Rule> rules = new ArrayList<Rule>(1);
-        rules.add(r);
-
-        PlanOptimizer<TOperator, TPlan> optimizer =
-            new PlanOptimizer<TOperator, TPlan>(plan, rules);
+        TOptimizer optimizer = new TOptimizer(plan);
+        optimizer.addRule(r);
 
         optimizer.optimize();
         assertFalse(transformer.mTransformed);
@@ -524,12 +615,8 @@
         Rule<TOperator, TPlan> r =
             new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
 
-        ArrayList<Rule> rules = new ArrayList<Rule>(1);
-        rules.add(r);
-
-        PlanOptimizer<TOperator, TPlan> optimizer =
-            new PlanOptimizer<TOperator, TPlan>(plan, rules);
-
+        TOptimizer optimizer = new TOptimizer(plan);
+        optimizer.addRule(r);
         optimizer.optimize();
         assertFalse(transformer.mTransformed);
     }
@@ -566,11 +653,8 @@
         Rule<TOperator, TPlan> r =
             new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
 
-        ArrayList<Rule> rules = new ArrayList<Rule>(1);
-        rules.add(r);
-
-        PlanOptimizer<TOperator, TPlan> optimizer =
-            new PlanOptimizer<TOperator, TPlan>(plan, rules);
+        TOptimizer optimizer = new TOptimizer(plan);
+        optimizer.addRule(r);
 
         optimizer.optimize();
         assertTrue(transformer.mTransformed);
@@ -611,11 +695,8 @@
         Rule<TOperator, TPlan> r =
             new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
 
-        ArrayList<Rule> rules = new ArrayList<Rule>(1);
-        rules.add(r);
-
-        PlanOptimizer<TOperator, TPlan> optimizer =
-            new PlanOptimizer<TOperator, TPlan>(plan, rules);
+        TOptimizer optimizer = new TOptimizer(plan);
+        optimizer.addRule(r);
 
         optimizer.optimize();
         assertTrue(transformer.mTransformed);
@@ -654,11 +735,8 @@
         Rule<TOperator, TPlan> r =
             new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
 
-        ArrayList<Rule> rules = new ArrayList<Rule>(1);
-        rules.add(r);
-
-        PlanOptimizer<TOperator, TPlan> optimizer =
-            new PlanOptimizer<TOperator, TPlan>(plan, rules);
+        TOptimizer optimizer = new TOptimizer(plan);
+        optimizer.addRule(r);
 
         optimizer.optimize();
         assertTrue(transformer.mTransformed);
@@ -694,11 +772,8 @@
         Rule<TOperator, TPlan> r =
             new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
 
-        ArrayList<Rule> rules = new ArrayList<Rule>(1);
-        rules.add(r);
-
-        PlanOptimizer<TOperator, TPlan> optimizer =
-            new PlanOptimizer<TOperator, TPlan>(plan, rules);
+        TOptimizer optimizer = new TOptimizer(plan);
+        optimizer.addRule(r);
 
         optimizer.optimize();
         assertTrue(transformer.mTransformed);
@@ -752,11 +827,8 @@
         Rule<TOperator, TPlan> r =
             new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
 
-        ArrayList<Rule> rules = new ArrayList<Rule>(1);
-        rules.add(r);
-
-        PlanOptimizer<TOperator, TPlan> optimizer =
-            new PlanOptimizer<TOperator, TPlan>(plan, rules);
+        TOptimizer optimizer = new TOptimizer(plan);
+        optimizer.addRule(r);
 
         optimizer.optimize();
         assertFalse(transformer.mTransformed);

Added: incubator/pig/branches/types/test/org/apache/pig/test/data/DotFiles/optplan1.dot
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/data/DotFiles/optplan1.dot?rev=672801&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/data/DotFiles/optplan1.dot (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/data/DotFiles/optplan1.dot Mon Jun 30 08:56:41 2008
@@ -0,0 +1,9 @@
+digraph graph1 {
+    graph [pigScript=" A = load 'myfile' as (p:int, q:long, r:float, s:double, t:map[], u:tuple (x:int, y:int), v:bag {tuple(int:z)}); B = group A by p; C = foreach B generate $0,  COUNT($1);" ] ;
+
+    load [key="0", type="LOLoad" , schema= "p: bytearray,q: bytearray,r: bytearray,s: bytearray,t: bytearray ,u: bytearray,v: bytearray"] ;
+    typecast [ key="23", type="LOForEach", schema= "p: int,q: long,r: float,s: double,t: map[] ,u: tuple(x: int,y: int),v: bag {x: tuple (z: int)}"] ;
+    order [key="2", type="LOSort", schema= "p: int,q: long,r: float,s: double,t: map[] ,u: tuple(x: int,y: int),v: bag {x: tuple (z: int)}"] ;
+
+    load -> typecast -> order ;
+}



Mime
View raw message