pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r672016 [1/4] - in /incubator/pig/branches/types: ./ src/org/apache/pig/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/parser/ src/org/apache/pig/impl/logicalLayer/validators/ src/org/apache/pig/impl/mapReduceLa...
Date Thu, 26 Jun 2008 20:05:26 GMT
Author: gates
Date: Thu Jun 26 13:05:23 2008
New Revision: 672016

URL: http://svn.apache.org/viewvc?rev=672016&view=rev
Log:
PIG-161 Shravan's foreach_gen_comb_patch.  Fixes foreach to remove generate and split out inner plans.


Added:
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalkerWOSeenChk.java
Removed:
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinderForExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POGenerate.java
Modified:
    incubator/pig/branches/types/build.xml
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFilter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigCombiner.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapBase.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinder.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Add.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/ConstantExpression.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Divide.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/ExpressionOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Mod.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Multiply.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POBinCond.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POIsNull.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POMapLookUp.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONegative.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONot.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POOr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PORegexp.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Subtract.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/UnaryExpressionOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PODistinct.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POFilter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POGlobalRearrange.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLoad.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLocalRearrange.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POPackage.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PORead.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSplit.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POStore.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POUnion.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestEqualTo.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestNotEqualTo.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestNull.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOMapLookUp.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestUnion.java
    incubator/pig/branches/types/test/org/apache/pig/test/TypeGraphPrinter.java
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Arithmetic.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/BinCond.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Cogroup.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Comparison.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/ComplexForeach.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Distinct.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Generate.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/IsNull1.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/IsNull2.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Sort.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Split1.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Split2.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Union.gld
    incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/POCastDummy.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/DotGraphVisitor.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java

Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Thu Jun 26 13:05:23 2008
@@ -278,12 +278,11 @@
                 <fileset dir="test">
                     <include name="**/*Test*.java" />
                     <!-- Excluced because they are end-to-end, don't work yet.  -->
-                    <exclude name="**/TestAlgebraicEval.java" />
-                    <exclude name="**/TestCompressedFiles.java" />
-                    <exclude name="**/TestEvalPipeline.java" />
+                    <!--
                     <exclude name="**/TestFilterOpNumeric.java" />
                     <exclude name="**/TestPigFile.java" />
                     <exclude name="**/TestStoreOld.java" />
+                    -->
                     <!-- Excluced because we don't want to run them -->
                     <exclude name="**/TypeCheckingTestUtil.java" />
                     <exclude name="**/TypeGraphPrinter.java" />

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Thu Jun 26 13:05:23 2008
@@ -462,8 +462,9 @@
     private ExecJob execute(
             LogicalPlan lp) throws FrontendException, ExecException {
         ExecJob job = null;
-
+//        lp.explain(System.out, System.err);
         LogicalPlan typeCheckedLp = compileLp(lp, "execute");
+//        typeCheckedLp.explain(System.out, System.err);
         PhysicalPlan pp = compilePp(typeCheckedLp);
         // execute using appropriate engine
         return pigContext.getExecutionEngine().execute(pp, "execute");

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java Thu Jun 26 13:05:23 2008
@@ -93,27 +93,8 @@
             // It's fine, it just means we don't have a schema yet.
         }
 		log.debug("After getFieldSchema()");
-        if (null == mFieldSchema) {
-            log.debug("Operator schema is null; Setting it to new schema");
-            mFieldSchema = fs;
-        } else {
-            log.debug("Reconciling schema");
-			log.debug("mFieldSchema: " + mFieldSchema + " fs: " + fs);
-            //log.debug("mSchema: " + mSchema + " schema: " + schema);
-			try {
-				if(null != mFieldSchema.schema) {
-            		mFieldSchema.schema.reconcile(fs.schema);
-				} else {
-					mFieldSchema.schema = fs.schema;
-				}
-				mFieldSchema.type = fs.type;
-				mFieldSchema.alias = fs.alias;
-				//the operator name is the name of the column being projected
-				setAlias(fs.alias);
-			} catch (ParseException pe) {
-				throw new FrontendException(pe.getMessage());
-			}
-        }
+        mFieldSchema = fs;
+        setAlias(fs.alias);
         mIsFieldSchemaComputed = true;
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Thu Jun 26 13:05:23 2008
@@ -91,11 +91,6 @@
     }
 
     @Override
-    public boolean supportsMultipleOutputs() {
-        return false;
-    }
-
-    @Override
     public Schema getSchema() throws FrontendException {
         // TODO create schema
         /*

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCross.java Thu Jun 26 13:05:23 2008
@@ -175,11 +175,6 @@
     }
 
     @Override
-    public boolean supportsMultipleOutputs() {
-        return false;
-    }
-
-    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LODistinct.java Thu Jun 26 13:05:23 2008
@@ -94,11 +94,6 @@
     }
 
     @Override
-    public boolean supportsMultipleOutputs() {
-        return false;
-    }
-
-    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFilter.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFilter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFilter.java Thu Jun 26 13:05:23 2008
@@ -94,11 +94,6 @@
     }
 
     @Override
-    public boolean supportsMultipleOutputs() {
-        return false;
-    }
-
-    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java Thu Jun 26 13:05:23 2008
@@ -19,6 +19,10 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Iterator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -39,7 +43,8 @@
      * applied over the input.
      */
 
-    private LogicalPlan mForEachPlan;
+    private ArrayList<LogicalPlan> mForEachPlans;
+    private ArrayList<Boolean> mFlatten;
     private static Log log = LogFactory.getLog(LOForEach.class);
 
     /**
@@ -52,41 +57,19 @@
      */
 
     public LOForEach(LogicalPlan plan, OperatorKey k,
-            LogicalPlan foreachPlan) {
+            ArrayList<LogicalPlan> foreachPlans, ArrayList<Boolean> flattenList) {
 
         super(plan, k);
-        mForEachPlan = foreachPlan;
+        mForEachPlans = foreachPlans;
+        mFlatten = flattenList;
     }
 
-    public LogicalPlan getForEachPlan() {
-        return mForEachPlan;
+    public ArrayList<LogicalPlan> getForEachPlans() {
+        return mForEachPlans;
     }
 
-    @Override
-    public Schema getSchema() throws FrontendException {
-        if (!mIsSchemaComputed && (null == mSchema)) {
-            // Assuming that the last operator is the GENERATE
-            // foreach has to terminate with a GENERATE
-            LogicalOperator last = null;
-            for(LogicalOperator op: mForEachPlan.getLeaves()) {
-                if(op instanceof LOGenerate) {
-                    last = op;
-                    break;
-                }
-            }
-            if(null == last) throw new FrontendException("Did not find generate in the foreach logicalplan");
-
-            log.debug("Last Operator: " + last.getClass().getName());
-            try {
-                mSchema = last.getSchema();
-                mIsSchemaComputed = true;
-            } catch (FrontendException ioe) {
-                mSchema = null;
-                mIsSchemaComputed = false;
-                throw ioe;
-            }
-        }
-        return mSchema;
+    public List<Boolean> getFlatten() {
+        return mFlatten;
     }
 
     @Override
@@ -100,11 +83,6 @@
     }
 
     @Override
-    public boolean supportsMultipleOutputs() {
-        return false;
-    }
-
-    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }
@@ -112,4 +90,160 @@
     public byte getType() {
         return DataType.BAG ;
     }
+
+    @Override
+    public Schema getSchema() throws FrontendException {
+        log.debug("Entering getSchema");
+        if (!mIsSchemaComputed && (null == mSchema)) {
+            List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>(
+                    mForEachPlans.size());
+
+            for (LogicalPlan plan : mForEachPlans) {
+                log.debug("Number of leaves in " + plan + " = " + plan.getLeaves().size());
+                for(int i = 0; i < plan.getLeaves().size(); ++i) {
+                    log.debug("Leaf" + i + "= " + plan.getLeaves().get(i));
+                }
+                //LogicalOperator op = plan.getRoots().get(0);
+                LogicalOperator op = plan.getLeaves().get(0);
+                log.debug("op: " + op.getClass().getName() + " " + op);
+            }
+            log.debug("Printed the leaves of the generate plans");
+
+            Map<Schema.FieldSchema, String> flattenAlias = new HashMap<Schema.FieldSchema, String>();
+            Map<String, Boolean> inverseFlattenAlias = new HashMap<String, Boolean>();
+            Map<String, Integer> aliases = new HashMap<String, Integer>();
+
+            for (int planCtr = 0; planCtr < mForEachPlans.size(); ++planCtr) {
+                LogicalPlan plan = mForEachPlans.get(planCtr);
+                LogicalOperator op = plan.getLeaves().get(0);
+                log.debug("op: " + op.getClass().getName() + " " + op);
+                log.debug("Flatten: " + mFlatten.get(planCtr));
+                Schema.FieldSchema planFs;
+
+                try {
+	                planFs = ((ExpressionOperator)op).getFieldSchema();
+					if(null != planFs) {
+						String outerCanonicalAlias = op.getAlias();
+						if(null == outerCanonicalAlias) {
+							outerCanonicalAlias = planFs.alias;
+						}
+						log.debug("Outer canonical alias: " + outerCanonicalAlias);
+						if(mFlatten.get(planCtr)) {
+							//need to extract the children and create the aliases
+							//assumption here is that flatten is only for one column
+							//i.e., flatten(A), flatten(A.x) and NOT
+							//flatten(B.(x,y,z))
+							Schema s = planFs.schema;
+							if(null != s) {
+								for(Schema.FieldSchema fs: s.getFields()) {
+									log.debug("fs: " + fs);
+									log.debug("fs.alias: " + fs.alias);
+									String innerCanonicalAlias = fs.alias;
+									if((null != outerCanonicalAlias) && (null != innerCanonicalAlias)) {
+										String disambiguatorAlias = outerCanonicalAlias + "::" + innerCanonicalAlias;
+										Schema.FieldSchema newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type);
+										fss.add(newFs);
+										Integer count;
+										count = aliases.get(innerCanonicalAlias);
+										if(null == count) {
+											aliases.put(innerCanonicalAlias, 1);
+										} else {
+											aliases.put(innerCanonicalAlias, ++count);
+										}
+										count = aliases.get(disambiguatorAlias);
+										if(null == count) {
+											aliases.put(disambiguatorAlias, 1);
+										} else {
+											aliases.put(disambiguatorAlias, ++count);
+										}
+										flattenAlias.put(newFs, innerCanonicalAlias);
+										inverseFlattenAlias.put(innerCanonicalAlias, true);
+										//it's fine if there are duplicates
+										//we just need to record if its due to
+										//flattening
+									} else {
+										Schema.FieldSchema newFs = new Schema.FieldSchema(null, fs.schema, fs.type);
+										fss.add(newFs);
+									}
+								}
+							} else {
+								Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+								fss.add(newFs);
+							}
+						} else {
+							//just populate the schema with the field schema of the expression operator
+	                   		fss.add(planFs);
+							if(null != outerCanonicalAlias) {
+								Integer count = aliases.get(outerCanonicalAlias);
+								if(null == count) {
+									aliases.put(outerCanonicalAlias, 1);
+								} else {
+									aliases.put(outerCanonicalAlias, ++count);
+								}
+							}
+						}
+					} else {
+						//did not get a valid list of field schemas
+						fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
+					}
+                } catch (FrontendException fee) {
+                    mSchema = null;
+                    mIsSchemaComputed = false;
+                    throw fee;
+                }
+            }
+			//check for duplicate column names and throw an error if there are duplicates
+			//ensure that flatten gets rid of duplicate column names when the checks are
+			//being done
+			log.debug(" flattenAlias: " + flattenAlias);
+			log.debug(" inverseFlattenAlias: " + inverseFlattenAlias);
+			log.debug(" aliases: " + aliases);
+			log.debug(" fss.size: " + fss.size());
+			boolean duplicates = false;
+			Map<String, Integer> duplicateAliases = new HashMap<String, Integer>();
+			for(String alias: aliases.keySet()) {
+				Integer count = aliases.get(alias);
+				if(count > 1) {//not checking for null here as counts are intitalized to 1
+					Boolean inFlatten = false;
+					log.debug("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
+					inFlatten = inverseFlattenAlias.get(alias);
+					log.debug("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
+					if((null == inFlatten) || (!inFlatten)) {
+						duplicates = true;
+						duplicateAliases.put(alias, count);
+					}
+				}
+			}
+			if(duplicates) {
+				String errMessage = "Found duplicates in schema! ";
+				if(duplicateAliases.size() > 0) {
+					Set<String> duplicateCols = duplicateAliases.keySet();
+					Iterator<String> iter = duplicateCols.iterator();
+					String col = iter.next();
+					errMessage += col + ": " + duplicateAliases.get(col) + " columns";
+					while(iter.hasNext()) {
+						col = iter.next();
+						errMessage += ", " + col + ": " + duplicateAliases.get(col) + " columns";
+					}
+				}
+				errMessage += ". Please alias the columns with unique names.";
+				log.debug(errMessage);
+				throw new FrontendException(errMessage);
+			}
+            mSchema = new Schema(fss);
+			//add the aliases that are unique after flattening
+			for(Schema.FieldSchema fs: mSchema.getFields()) {
+				String alias = flattenAlias.get(fs);
+				Integer count = aliases.get(alias);
+				if (null == count) count = 1;
+				log.debug("alias: " + alias);
+				if((null != alias) && (count == 1)) {
+					mSchema.addAlias(alias, fs);
+				}
+			}
+            mIsSchemaComputed = true;
+        }
+        log.debug("Exiting getSchema");
+        return mSchema;
+    }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java Thu Jun 26 13:05:23 2008
@@ -24,6 +24,7 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.HashMap;
+import java.util.Collection;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -104,162 +105,29 @@
     }
 
     @Override
-    public boolean supportsMultipleOutputs() {
-        return false;
-    }
-
-    @Override
     public Schema getSchema() throws FrontendException {
-        log.debug("Entering getSchema");
         if (!mIsSchemaComputed && (null == mSchema)) {
-            List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>(
-                    mGeneratePlans.size());
-
-            for (LogicalPlan plan : mGeneratePlans) {
-                log.debug("Number of leaves in " + plan + " = " + plan.getLeaves().size());
-                for(int i = 0; i < plan.getLeaves().size(); ++i) {
-                    log.debug("Leaf" + i + "= " + plan.getLeaves().get(i));
+            // Get the schema of the parent
+            Collection<LogicalOperator> s = mPlan.getPredecessors(this);
+            ArrayList<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
+            try {
+                LogicalOperator op = s.iterator().next();
+                if (null == op) {
+                    throw new FrontendException("Could not find operator in plan");
                 }
-                //LogicalOperator op = plan.getRoots().get(0);
-                LogicalOperator op = plan.getLeaves().get(0);
-                log.debug("op: " + op.getClass().getName() + " " + op);
-            }
-            log.debug("Printed the leaves of the generate plans");
-
-            Map<Schema.FieldSchema, String> flattenAlias = new HashMap<Schema.FieldSchema, String>();
-            Map<String, Boolean> inverseFlattenAlias = new HashMap<String, Boolean>();
-            Map<String, Integer> aliases = new HashMap<String, Integer>();
-
-            for (int planCtr = 0; planCtr < mGeneratePlans.size(); ++planCtr) {
-                LogicalPlan plan = mGeneratePlans.get(planCtr);
-                LogicalOperator op = plan.getLeaves().get(0);
-                log.debug("op: " + op.getClass().getName() + " " + op);
-                Schema.FieldSchema planFs;
-
-                try {
-	                planFs = ((ExpressionOperator)op).getFieldSchema();
-					if(null != planFs) {
-						String outerCanonicalAlias = op.getAlias();
-						if(null == outerCanonicalAlias) {
-							outerCanonicalAlias = planFs.alias;
-						}
-						log.debug("Outer canonical alias: " + outerCanonicalAlias);
-						if(mFlatten.get(planCtr)) {
-							//need to extract the children and create the aliases
-							//assumption here is that flatten is only for one column
-							//i.e., flatten(A), flatten(A.x) and NOT
-							//flatten(B.(x,y,z))
-							Schema s = planFs.schema;
-							if(null != s) {
-								for(Schema.FieldSchema fs: s.getFields()) {
-									log.debug("fs: " + fs);
-									log.debug("fs.alias: " + fs.alias);
-									String innerCanonicalAlias = fs.alias;
-									if((null != outerCanonicalAlias) && (null != innerCanonicalAlias)) {
-										String disambiguatorAlias = outerCanonicalAlias + "::" + innerCanonicalAlias;
-										Schema.FieldSchema newFs = new Schema.FieldSchema(disambiguatorAlias, fs.schema, fs.type);
-										fss.add(newFs);
-										Integer count;
-										count = aliases.get(innerCanonicalAlias);
-										if(null == count) {
-											aliases.put(innerCanonicalAlias, 1);
-										} else {
-											aliases.put(innerCanonicalAlias, ++count);
-										}
-										count = aliases.get(disambiguatorAlias);
-										if(null == count) {
-											aliases.put(disambiguatorAlias, 1);
-										} else {
-											aliases.put(disambiguatorAlias, ++count);
-										}
-										flattenAlias.put(newFs, innerCanonicalAlias);
-										inverseFlattenAlias.put(innerCanonicalAlias, true);
-										//it's fine if there are duplicates
-										//we just need to record if its due to
-										//flattening
-									} else {
-										Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
-										fss.add(newFs);
-									}
-								}
-							} else {
-								Schema.FieldSchema newFs = new Schema.FieldSchema(null, DataType.BYTEARRAY);
-								fss.add(newFs);
-							}
-						} else {
-							//just populate the schema with the field schema of the expression operator
-	                   		fss.add(planFs);
-							if(null != outerCanonicalAlias) {
-								Integer count = aliases.get(outerCanonicalAlias);
-								if(null == count) {
-									aliases.put(outerCanonicalAlias, 1);
-								} else {
-									aliases.put(outerCanonicalAlias, ++count);
-								}
-							}
-						}
-					} else {
-						//did not get a valid list of field schemas
-						fss.add(new Schema.FieldSchema(null, DataType.BYTEARRAY));
-					}
-                } catch (FrontendException fee) {
-                    mSchema = null;
-                    mIsSchemaComputed = false;
-                    throw fee;
+                if(op instanceof ExpressionOperator) {
+                    fss.add(((ExpressionOperator)op).getFieldSchema());
+                    mSchema = new Schema(fss);
+                } else {
+                    mSchema = op.getSchema();
                 }
+                mIsSchemaComputed = true;
+            } catch (FrontendException ioe) {
+                mSchema = null;
+                mIsSchemaComputed = false;
+                throw ioe;
             }
-			//check for duplicate column names and throw an error if there are duplicates
-			//ensure that flatten gets rid of duplicate column names when the checks are
-			//being done
-			log.debug(" flattenAlias: " + flattenAlias);
-			log.debug(" inverseFlattenAlias: " + inverseFlattenAlias);
-			log.debug(" aliases: " + aliases);
-			log.debug(" fss.size: " + fss.size());
-			boolean duplicates = false;
-			Map<String, Integer> duplicateAliases = new HashMap<String, Integer>();
-			for(String alias: aliases.keySet()) {
-				Integer count = aliases.get(alias);
-				if(count > 1) {//not checking for null here as counts are intitalized to 1
-					Boolean inFlatten = false;
-					log.debug("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
-					inFlatten = inverseFlattenAlias.get(alias);
-					log.debug("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
-					if((null == inFlatten) || (!inFlatten)) {
-						duplicates = true;
-						duplicateAliases.put(alias, count);
-					}
-				}
-			}
-			if(duplicates) {
-				String errMessage = "Found duplicates in schema! ";
-				if(duplicateAliases.size() > 0) {
-					Set<String> duplicateCols = duplicateAliases.keySet();
-					Iterator<String> iter = duplicateCols.iterator();
-					String col = iter.next();
-					errMessage += col + ": " + duplicateAliases.get(col) + " columns";
-					while(iter.hasNext()) {
-						col = iter.next();
-						errMessage += ", " + col + ": " + duplicateAliases.get(col) + " columns";
-					}
-				}
-				errMessage += ". Please alias the columns with unique names.";
-				log.debug(errMessage);
-				throw new FrontendException(errMessage);
-			}
-            mSchema = new Schema(fss);
-			//add the aliases that are unique after flattening
-			for(Schema.FieldSchema fs: mSchema.getFields()) {
-				String alias = flattenAlias.get(fs);
-				Integer count = aliases.get(alias);
-				if (null == count) count = 1;
-				log.debug("alias: " + alias);
-				if((null != alias) && (count == 1)) {
-					mSchema.addAlias(alias, fs);
-				}
-			}
-            mIsSchemaComputed = true;
         }
-        log.debug("Exiting getSchema");
         return mSchema;
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java Thu Jun 26 13:05:23 2008
@@ -117,11 +117,6 @@
         return false;
     }
 
-    @Override
-    public boolean supportsMultipleOutputs() {
-        return false;
-    }
-
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java Thu Jun 26 13:05:23 2008
@@ -130,7 +130,7 @@
             sb.append(planString(((LOFilter)node).getComparisonPlan()));
         }
         else if(node instanceof LOForEach){
-            sb.append(planString(((LOForEach)node).getForEachPlan()));        
+            sb.append(planString(((LOForEach)node).getForEachPlans()));        
         }
         else if(node instanceof LOGenerate){
             sb.append(planString(((LOGenerate)node).getGeneratePlans())); 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java Thu Jun 26 13:05:23 2008
@@ -64,6 +64,7 @@
     private boolean mIsStar = false;
     private static Log log = LogFactory.getLog(LOProject.class);
     private boolean mSentinel;
+    private boolean mOverloaded = false;
 
     /**
      * 
@@ -146,9 +147,17 @@
         mSentinel = b;
     }
 
+    public boolean getOverlaoded() {
+        return mOverloaded;
+    }
+
+    public void setOverloaded(boolean b) {
+        mOverloaded = b;
+    }
+
     @Override
     public String name() {
-        return "Project " + mKey.scope + "-" + mKey.id + " Projections: " + (mIsStar? " [*] ": mProjection);
+        return "Project " + mKey.scope + "-" + mKey.id + " Projections: " + (mIsStar? " [*] ": mProjection) + " Overloaded: " + mOverloaded;
     }
 
     @Override
@@ -186,7 +195,7 @@
                             //TODO
                             //the type of the operator will be unknown. when type checking is in place
                             //add the type of the operator as a parameter to the fieldschema creation
-                            mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema(), expressionOperator.getType());
+                            mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema(), DataType.TUPLE);
                             //mFieldSchema = new Schema.FieldSchema(expressionOperator.getAlias(), expressionOperator.getSchema());
                         }
                     } else {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java Thu Jun 26 13:05:23 2008
@@ -132,11 +132,6 @@
         return false;
     }
 
-    @Override
-    public boolean supportsMultipleOutputs() {
-        return false;
-    }
-
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java Thu Jun 26 13:05:23 2008
@@ -95,11 +95,6 @@
         return false;
     }
 
-    @Override
-    public boolean supportsMultipleOutputs() {
-        return false;
-    }
-
     public int getReadFrom() {
         return mIndex;
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java Thu Jun 26 13:05:23 2008
@@ -98,11 +98,6 @@
     }
 
     @Override
-    public boolean supportsMultipleOutputs() {
-        return false;
-    }
-
-    @Override
     public void visit(LOVisitor v) throws VisitorException {
         v.visit(this);
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java Thu Jun 26 13:05:23 2008
@@ -140,9 +140,9 @@
      *            the logical generate operator that has to be visited
      * @throws VisitorException
      */
-    protected void visit(LOGenerate g) throws VisitorException {
+    protected void visit(LOForEach forEach) throws VisitorException {
         // Visit each of generates projection elements.
-        for(LogicalPlan lp: g.getGeneratePlans()) {
+        for(LogicalPlan lp: forEach.getForEachPlans()) {
             PlanWalker w = new DependencyOrderWalker(lp);
             pushWalker(w);
             for(LogicalOperator logicalOp: lp.getRoots()) {
@@ -205,15 +205,16 @@
      *            the logical foreach operator that has to be visited
      * @throws VisitorException
      */
-    protected void visit(LOForEach forEach) throws VisitorException {
+    protected void visit(LOGenerate g) throws VisitorException {
         // Visit the operators that are part of the foreach plan
-        LogicalPlan plan = forEach.getForEachPlan();
-        PlanWalker w = new DependencyOrderWalker(plan);
-        pushWalker(w);
-        for(LogicalOperator logicalOp: plan.getRoots()) {
-            logicalOp.visit(this);
+        for(LogicalPlan lp: g.getGeneratePlans()) {
+            PlanWalker w = new DependencyOrderWalker(lp);
+            pushWalker(w);
+            for(LogicalOperator logicalOp: lp.getRoots()) {
+                logicalOp.visit(this);
+            }
+            popWalker();
         }
-        popWalker();
     }
 
     /**

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java Thu Jun 26 13:05:23 2008
@@ -32,7 +32,6 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
-import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
 import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.physicalLayer.relationalOperators.*;
@@ -40,6 +39,7 @@
 import org.apache.pig.impl.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.impl.physicalLayer.expressionOperators.BinaryExpressionOperator;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.DependencyOrderWalkerWOSeenChk;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -52,7 +52,7 @@
 
     Random r = new Random();
 
-    Stack<PhysicalPlan<? extends PhysicalOperator>> currentPlans;
+    Stack<PhysicalPlan> currentPlans;
 
     PhysicalPlan currentPlan;
 
@@ -68,8 +68,8 @@
         super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(
                 plan));
 
-        currentPlans = new Stack<PhysicalPlan<? extends PhysicalOperator>>();
-        currentPlan = new PhysicalPlan<PhysicalOperator>();
+        currentPlans = new Stack<PhysicalPlan>();
+        currentPlan = new PhysicalPlan();
         LogToPhyMap = new HashMap<LogicalOperator, PhysicalOperator>();
     }
 
@@ -77,7 +77,7 @@
         this.pc = pc;
     }
 
-    public PhysicalPlan<PhysicalOperator> getPhysicalPlan() {
+    public PhysicalPlan getPhysicalPlan() {
 
         return currentPlan;
     }
@@ -507,15 +507,15 @@
             POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
                     scope, nodeGen.getNextNodeId(scope)), cg
                     .getRequestedParallelism());
-            List<ExprPlan> exprPlans = new ArrayList<ExprPlan>();
+            List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
             currentPlans.push(currentPlan);
             for (LogicalPlan lp : plans) {
-                currentPlan = new ExprPlan();
+                currentPlan = new PhysicalPlan();
                 PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
                         .spawnChildWalker(lp);
                 pushWalker(childWalker);
                 mCurrentWalker.walk(this);
-                exprPlans.add((ExprPlan) currentPlan);
+                exprPlans.add((PhysicalPlan) currentPlan);
                 popWalker();
 
             }
@@ -559,7 +559,7 @@
         LogToPhyMap.put(filter, poFilter);
         currentPlans.push(currentPlan);
 
-        currentPlan = new ExprPlan();
+        currentPlan = new PhysicalPlan();
 
         PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
                 .spawnChildWalker(filter.getComparisonPlan());
@@ -567,7 +567,7 @@
         mCurrentWalker.walk(this);
         popWalker();
 
-        poFilter.setPlan((ExprPlan) currentPlan);
+        poFilter.setPlan((PhysicalPlan) currentPlan);
         currentPlan = currentPlans.pop();
 
         List<LogicalOperator> op = filter.getPlan().getPredecessors(filter);
@@ -612,60 +612,66 @@
     }
 
     @Override
-    public void visit(LOForEach forEach) throws VisitorException {
-        String scope = forEach.getOperatorKey().scope;
-        // This needs to be handled specially.
-        // We need to be able to handle arbitrary levels of nesting
+    public void visit(LOForEach g) throws VisitorException {
+        boolean currentPhysicalPlan = false;
+        String scope = g.getOperatorKey().scope;
+        List<PhysicalPlan> innerPlans = new ArrayList<PhysicalPlan>();
+        List<LogicalPlan> plans = g.getForEachPlans();
 
-        // push the current physical plan in the stack.
         currentPlans.push(currentPlan);
+        for (LogicalPlan plan : plans) {
+            currentPlan = new PhysicalPlan();
+            PlanWalker<LogicalOperator, LogicalPlan> childWalker = new DependencyOrderWalkerWOSeenChk<LogicalOperator, LogicalPlan>(
+                    plan);
+            pushWalker(childWalker);
+            childWalker.walk(this);
+            innerPlans.add(currentPlan);
+            popWalker();
+        }
+        currentPlan = currentPlans.pop();
 
-        // create a new physical plan
-        currentPlan = new PhysicalPlan<PhysicalOperator>();
-        PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
-                .spawnChildWalker(forEach.getForEachPlan());
+        // PhysicalOperator poGen = new POGenerate(new OperatorKey("",
+        // r.nextLong()), inputs, toBeFlattened);
+        POForEach poFE = new POForEach(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), g.getRequestedParallelism(), innerPlans,
+                g.getFlatten());
+        poFE.setResultType(g.getType());
+        LogToPhyMap.put(g, poFE);
+        currentPlan.add(poFE);
 
-        // now populate the physical plan by walking
-        pushWalker(childWalker);
-        mCurrentWalker.walk(this);
-        popWalker();
+        // generate cannot have multiple inputs
+        List<LogicalOperator> op = g.getPlan().getPredecessors(g);
 
-        POForEach fe = new POForEach(new OperatorKey(scope, nodeGen
-                .getNextNodeId(scope)), forEach.getRequestedParallelism());
-        fe.setPlan(currentPlan);
-        fe.setResultType(DataType.TUPLE);
-        LogToPhyMap.put(forEach, fe);
+        // generate may not have any predecessors
+        if (op == null)
+            return;
 
-        // now connect foreach to its inputs
-        currentPlan = currentPlans.pop();
-        currentPlan.add(fe);
-        PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(mPlan
-                .getPredecessors(forEach).get(0));
+        PhysicalOperator from = LogToPhyMap.get(op.get(0));
         try {
-            currentPlan.connect(from, fe);
+            currentPlan.connect(from, poFE);
         } catch (PlanException e) {
             log.error("Invalid physical operators in the physical plan"
                     + e.getMessage());
-
         }
 
     }
 
+    /*
     @Override
     public void visit(LOGenerate g) throws VisitorException {
         boolean currentPhysicalPlan = false;
         String scope = g.getOperatorKey().scope;
-        List<ExprPlan> exprPlans = new ArrayList<ExprPlan>();
+        List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
         List<LogicalPlan> plans = g.getGeneratePlans();
 
         currentPlans.push(currentPlan);
         for (LogicalPlan plan : plans) {
-            currentPlan = new ExprPlan();
+            currentPlan = new PhysicalPlan();
             PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
                     .spawnChildWalker(plan);
             pushWalker(childWalker);
             childWalker.walk(this);
-            exprPlans.add((ExprPlan) currentPlan);
+            exprPlans.add((PhysicalPlan) currentPlan);
             popWalker();
         }
         currentPlan = currentPlans.pop();
@@ -695,22 +701,23 @@
         }
 
     }
+    */
 
     @Override
     public void visit(LOSort s) throws VisitorException {
         String scope = s.getOperatorKey().scope;
         List<LogicalPlan> logPlans = s.getSortColPlans();
-        List<ExprPlan> sortPlans = new ArrayList<ExprPlan>(logPlans.size());
+        List<PhysicalPlan> sortPlans = new ArrayList<PhysicalPlan>(logPlans.size());
 
         // convert all the logical expression plans to physical expression plans
         currentPlans.push(currentPlan);
         for (LogicalPlan plan : logPlans) {
-            currentPlan = new ExprPlan();
+            currentPlan = new PhysicalPlan();
             PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
                     .spawnChildWalker(plan);
             pushWalker(childWalker);
             childWalker.walk(this);
-            sortPlans.add((ExprPlan) currentPlan);
+            sortPlans.add((PhysicalPlan) currentPlan);
             popWalker();
         }
         currentPlan = currentPlans.pop();
@@ -732,7 +739,7 @@
         // sort.setRequestedParallelism(s.getType());
         LogToPhyMap.put(s, sort);
         currentPlan.add(sort);
-        PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(s.mPlan
+        PhysicalOperator from = LogToPhyMap.get(s.mPlan
                 .getPredecessors(s).get(0));
         try {
             currentPlan.connect(from, sort);
@@ -791,14 +798,14 @@
 
         currentPlan.add(physOp);
         currentPlans.push(currentPlan);
-        currentPlan = new ExprPlan();
+        currentPlan = new PhysicalPlan();
         PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
                 .spawnChildWalker(split.getConditionPlan());
         pushWalker(childWalker);
         mCurrentWalker.walk(this);
         popWalker();
 
-        ((POFilter) physOp).setPlan((ExprPlan) currentPlan);
+        ((POFilter) physOp).setPlan((PhysicalPlan) currentPlan);
         currentPlan = currentPlans.pop();
         currentPlan.add(physOp);
         PhysicalOperator from = LogToPhyMap.get(split.getPlan()
@@ -828,7 +835,7 @@
         currentPlan.add(p);
         List<LogicalOperator> fromList = func.getPlan().getPredecessors(func);
         for (LogicalOperator op : fromList) {
-            PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(op);
+            PhysicalOperator from = LogToPhyMap.get(op);
             try {
                 currentPlan.connect(from, p);
             } catch (PlanException e) {
@@ -863,7 +870,7 @@
         store.setSFile(loStore.getOutputFile());
         store.setPc(pc);
         currentPlan.add(store);
-        PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(loStore
+        PhysicalOperator from = LogToPhyMap.get(loStore
                 .getPlan().getPredecessors(loStore).get(0));
         try {
             currentPlan.connect(from, store);

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Thu Jun 26 13:05:23 2008
@@ -224,15 +224,6 @@
      */
     public abstract void visit(LOVisitor v) throws VisitorException;
 
-	/*
-    public boolean isFlatten() {
-        return mIsFlatten;
-    }
-
-    public void setFlatten(boolean b) {
-        mIsFlatten = b;
-    }
-	*/
     public LogicalPlan getPlan() {
         return mPlan ;
     }
@@ -245,4 +236,10 @@
     public void setSchemaComputed(boolean computed) {
        mIsSchemaComputed = computed ;   
     }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return true;
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Jun 26 13:05:23 2008
@@ -254,17 +254,18 @@
 		
 		//Construct the generate operator from the list of projection plans
 		//Add the generate operator to the foreach logical plan
+        /*
 		LogicalOperator generate = new LOGenerate(lp, new OperatorKey(scope, getNextId()), generatePlans, flattenList);
 		foreachPlan.add(generate);
 		log.debug("Added operator " + generate.getClass().getName() + " to the logical plan " + lp);
-
+        */
 		
 		/*
 		 * Construct the foreach operator from the foreach logical plan
 		 * Add the foreach operator to the top level logical plan
 		 */
 		 
-		LogicalOperator foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), foreachPlan);
+		LogicalOperator foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), generatePlans, flattenList);
 		lp.add(foreach);
 		log.debug("Added operator " + foreach.getClass().getName() + " to the logical plan");
 		lp.connect(cogroup, foreach);
@@ -353,6 +354,14 @@
 		resetGenerateInputs();
 	}
 
+    boolean checkGenerateInput(LogicalOperator in) {
+        if(null == generateInputs) return false;
+        for(LogicalOperator op: generateInputs) {
+            if(op == in) return true;
+        }
+        return false;
+    }
+
 	 //END
 	
 	private static Map<String, Byte> nameToTypeMap = DataType.genNameToTypeMap();
@@ -1248,12 +1257,12 @@
 	}
 }
 
-
 LogicalOperator ForEachClause(LogicalPlan lp) : 
 {
 	ArrayList<LogicalOperator> specList = new ArrayList<LogicalOperator>(); 
 	LogicalOperator input, foreach; 
 	LogicalPlan foreachPlan = new LogicalPlan();
+    ArrayList<LogicalPlan> foreachPlans = new ArrayList<LogicalPlan>();
 	log.trace("Entering ForEachClause");
 }
 {
@@ -1262,7 +1271,59 @@
 	specList = NestedBlock(input.getSchema(), specList, foreachPlan, input)
 	)
 	{
-		foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), foreachPlan);
+        LOGenerate generate = (LOGenerate)specList.get(specList.size() - 1);
+        List<LogicalPlan> generatePlans = generate.getGeneratePlans();
+        List<Boolean> flattenList = generate.getFlatten();
+        /*
+        Generate's nested plans will be translated to foreach's nested plan
+        If generate contains an expression that does not require generate's
+        inputs then it should be made part of foreach without the generate
+        For the remaining expressions, the entire DAG till generate has to be
+        duplicated and then the nested plan attached to a new generate
+        */
+
+        for (int planCtr = 0; planCtr < generatePlans.size(); ++planCtr) {
+            LogicalPlan generatePlan = generatePlans.get(planCtr);
+            List<LogicalOperator> planRoots = generatePlan.getRoots();
+            boolean needGenerateInput = false;
+            boolean needForEachInput = false;
+            MultiMap<LogicalOperator, LogicalOperator> mapProjectInputs = null;
+            for(LogicalOperator root: planRoots) {
+                if(root instanceof LOProject) {
+                    LOProject project = (LOProject)root;
+                    LogicalOperator projectInput = project.getExpression();
+                    if(checkGenerateInput(projectInput)) {
+                        needGenerateInput = true;
+                        if(null == mapProjectInputs) {
+                            mapProjectInputs = new MultiMap<LogicalOperator, LogicalOperator>();
+                        }
+                        mapProjectInputs.put(root, projectInput);
+                    } else {
+                        needForEachInput = true;
+                    }
+                }
+            }
+            if(needGenerateInput) {
+                /*
+                Duplicate the logical plan until the generate but excluding generate
+                Create a new generate operator with the plan being iterated on
+                Attach the generate as the leaf and add the duplicated plan to
+                the list of foreach plans
+                */
+
+                for(LogicalOperator project: mapProjectInputs.keySet()) {
+                    for(LogicalOperator projectInput: mapProjectInputs.get(project)) {
+                        generatePlan.add(projectInput);
+                        generatePlan.connect(projectInput, project);
+                        attachPlan(generatePlan, projectInput, foreachPlan);
+                    }
+                }
+            }
+            foreachPlans.add(generatePlan);
+        }
+
+		resetGenerateState();
+		foreach = new LOForEach(lp, new OperatorKey(scope, getNextId()), (ArrayList)foreachPlans, (ArrayList)flattenList);
 		try {
 			lp.add(foreach);
 			log.debug("Added operator " + foreach.getClass().getName() + " to the logical plan");
@@ -1471,7 +1532,6 @@
 			lp.connect(op, spec);
 			log.debug("Connected operator: " + op.getClass().getName() + " to " + op + " " + spec + " in logical plan " + lp);
 		}
-		resetGenerateState();
 		log.trace("Exiting GenerateStatement");
 		return spec;
 	}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Thu Jun 26 13:05:23 2008
@@ -144,6 +144,42 @@
 
     protected void visit(LOProject pj) throws VisitorException {
         resolveLOProjectType(pj) ;
+
+        LogicalPlan currentPlan =  (LogicalPlan) mCurrentWalker.getPlan() ;
+        List<LogicalOperator> list = currentPlan.getSuccessors(pj) ;
+        if((null != list) && !(list.get(0) instanceof ExpressionOperator)) {
+            pj.setOverloaded(true);
+            pj.setType(DataType.TUPLE);
+            Schema pjSchema;
+            try {
+                Schema.FieldSchema pjFs = pj.getFieldSchema();
+                pj.setFieldSchema(new Schema.FieldSchema(pjFs.alias, pjFs.schema, DataType.TUPLE));
+            } catch (FrontendException fe) {
+                String msg = "Error getting LOProject's input schema" ;
+                msgCollector.collect(msg, MessageType.Error);
+                VisitorException vse = new VisitorException(msg) ;
+                log.info("FrontendException: " + fe.getMessage());
+                vse.initCause(fe) ;
+                throw new VisitorException(msg) ;
+             }
+        } else {
+            LogicalOperator projectInput = pj.getExpression();
+            if((projectInput instanceof LOProject) && (projectInput.getType() == DataType.BAG)) {
+                pj.setType(DataType.BAG);
+                Schema pjSchema;
+                try {
+                    pjSchema = new Schema(pj.getFieldSchema());
+                    pj.setFieldSchema(new Schema.FieldSchema(pj.getAlias(), pjSchema, DataType.BAG));
+                } catch (FrontendException fe) {
+                    String msg = "Error getting LOProject's input schema" ;
+                    msgCollector.collect(msg, MessageType.Error);
+                    VisitorException vse = new VisitorException(msg) ;
+                    log.info("FrontendException: " + fe.getMessage());
+                    vse.initCause(fe) ;
+                    throw new VisitorException(msg) ;
+                }
+            }
+        }
     }
 
     private void resolveLOProjectType(LOProject pj) throws VisitorException {
@@ -1591,39 +1627,6 @@
     }
 
     /**
-     * This implementation assumes LOForeach only works
-     * in conjunction with the inner LOGenerate
-     *
-     * The output schema will be generated by the inner generate
-     */
-
-    protected void visit(LOForEach forEach) throws VisitorException {
-
-        // TODO: Shouldn't ForEach have getInput() ???
-        List<LogicalOperator> inputList = mPlan.getPredecessors(forEach) ;
-        
-        if (inputList.size() != 1) {
-            throw new AssertionError("LOForEach cannot have more than one input") ;
-        }
-
-        // Check the inner plan
-        LogicalPlan innerPlan = forEach.getForEachPlan() ;
-        checkInnerPlan(innerPlan) ;
-
-        try {
-            forEach.getSchema() ;
-        }
-        catch (FrontendException ioe) {
-            String msg = "Problem while reconciling output schema of LOForEach" ;
-            msgCollector.collect(msg, MessageType.Error);
-            VisitorException vse = new VisitorException(msg) ;
-            vse.initCause(ioe) ;
-            throw vse ;
-        }
-               
-    }
-
-    /**
      * COGroup
      * All group by cols from all inputs have to be of the
      * same type
@@ -1918,16 +1921,16 @@
     }
 
     /***
-     * Output schema of LOGenerate is a tuple schma
+     * Output schema of LOForEach is a tuple schma
      * which is the output of all inner plans
      *
      * Flatten also has to be taken care on in here
      *
      */
 
-    protected void visit(LOGenerate g) throws VisitorException {
+    protected void visit(LOForEach g) throws VisitorException {
 
-        List<LogicalPlan> plans = g.getGeneratePlans() ;
+        List<LogicalPlan> plans = g.getForEachPlans() ;
         List<Boolean> flattens = g.getFlatten() ;
 
         try {
@@ -1959,7 +1962,7 @@
                     }
                     else {
                         throw new AssertionError("Unsupported root type in "
-                            +"LOGenerate:" + innerRoot.getClass().getSimpleName()) ;
+                            +"LOForEach:" + innerRoot.getClass().getSimpleName()) ;
                     }
                 }
 
@@ -1969,7 +1972,7 @@
 
             Schema schema = g.getSchema() ;
 
-            // Propagate type information from inner plans back to LOGenerate 
+            // Propagate type information from inner plans back to LOForEach 
             for(int i=0;i < plans.size(); i++) {
 
                 LogicalPlan plan = plans.get(i) ;
@@ -2007,7 +2010,7 @@
                     }
                     else {
                         throw new AssertionError("Unsupported inner plan type: "
-                                                 + DataType.findTypeName(leaf.getType()) + " in LOGenerate" ) ;
+                                                 + DataType.findTypeName(leaf.getType()) + " in LOForEach" ) ;
                     }
                 }
 
@@ -2017,13 +2020,13 @@
 
         }
         catch (FrontendException pe) {
-            String msg = "Problem resolving LOGenerate schema" ;
+            String msg = "Problem resolving LOForEach schema" ;
             msgCollector.collect(msg, MessageType.Error) ;
             VisitorException vse = new VisitorException(msg) ;
             throw vse ;
         }
         catch (ParseException pe) {
-            String msg = "Problem resolving LOGenerate schema" ;
+            String msg = "Problem resolving LOForEach schema" ;
             msgCollector.collect(msg, MessageType.Error) ;
             VisitorException vse = new VisitorException(msg) ;
             vse.initCause(pe) ;
@@ -2071,13 +2074,13 @@
             if (op instanceof LOProject) {
                 resolveLOProjectType((LOProject) op);
             }
-            // TODO: I think we better need a sentinel connecting to LOGenerate
-            else if (op instanceof LOGenerate) {
+            // TODO: I think we better need a sentinel connecting to LOForEach
+            else if (op instanceof LOForEach) {
                 op.setType(DataType.TUPLE);
                 PlanWalker<LogicalOperator, LogicalPlan> walker
                         = new DependencyOrderWalker<LogicalOperator, LogicalPlan>(innerPlan) ;
                 pushWalker(walker) ;
-                this.visit((LOGenerate) op) ;
+                this.visit((LOForEach) op) ;
                 popWalker() ;
             }
             else if (op instanceof LOConst) {
@@ -2135,6 +2138,7 @@
                                                     throws VisitorException {
         LogicalPlan currentPlan =  mCurrentWalker.getPlan() ;
 
+        /*
         // Make sure that two operators are in the same plan
         if (fromOp.getPlan() != toOp.getPlan()) {
             throw new AssertionError("Two operators have toOp be in the same plan") ;
@@ -2144,6 +2148,7 @@
             throw new AssertionError("Cannot manipulate any other plan"
                                     +" than the current one") ;
         }
+        */
 
         // Make sure that they are adjacent and the direction
         // is from "fromOp" to "toOp"
@@ -2254,13 +2259,8 @@
                 flattenList.add(new Boolean(false)) ;
             }
 
-            LOGenerate generate = new LOGenerate(currentPlan, genNewOperatorKey(fromOp),
-                                                 generatePlans, flattenList) ;
-
-            foreachPlan.add(generate) ;
-
             // Create ForEach to be inserted
-            LOForEach foreach = new LOForEach(currentPlan, genNewOperatorKey(fromOp), foreachPlan) ;
+            LOForEach foreach = new LOForEach(currentPlan, genNewOperatorKey(fromOp), generatePlans, flattenList) ;
 
             // Manipulate the plan structure
             currentPlan.add(foreach);

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java Thu Jun 26 13:05:23 2008
@@ -274,7 +274,7 @@
         }
     }
     
-    private List<PhysicalOperator> getRoots(PhysicalPlan<PhysicalOperator> php){
+    private List<PhysicalOperator> getRoots(PhysicalPlan php){
         List<PhysicalOperator> ret = new ArrayList<PhysicalOperator>();
         for (PhysicalOperator operator : php.getRoots()) {
             ret.add(operator);

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java Thu Jun 26 13:05:23 2008
@@ -59,7 +59,7 @@
      * @throws ExecException
      * @throws JobCreationException
      */
-    public abstract boolean launchPig(PhysicalPlan<PhysicalOperator> php, String grpName, PigContext pc)
+    public abstract boolean launchPig(PhysicalPlan php, String grpName, PigContext pc)
             throws PlanException, VisitorException, IOException, ExecException,
             JobCreationException;
     

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java?rev=672016&r1=672015&r2=672016&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java Thu Jun 26 13:05:23 2008
@@ -13,7 +13,6 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
@@ -23,7 +22,7 @@
     private static final Log log = LogFactory.getLog(Launcher.class);
     
     @Override
-    public boolean launchPig(PhysicalPlan<PhysicalOperator> php,
+    public boolean launchPig(PhysicalPlan php,
                              String grpName,
                              PigContext pc) throws PlanException,
                                                    VisitorException,
@@ -82,7 +81,7 @@
     }
     
     //A purely testing method. Not to be used elsewhere
-    public boolean launchPigWithCombinePlan(PhysicalPlan<PhysicalOperator> php,
+    public boolean launchPigWithCombinePlan(PhysicalPlan php,
             String grpName, PigContext pc, PhysicalPlan combinePlan) throws PlanException,
             VisitorException, IOException, ExecException, JobCreationException {
         long sleepTime = 500;



Mime
View raw message