pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r669666 [1/3] - in /incubator/pig/branches/types: src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/local/executionengine/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/logicalLayer...
Date Thu, 19 Jun 2008 19:56:03 GMT
Author: gates
Date: Thu Jun 19 12:56:00 2008
New Revision: 669666

URL: http://svn.apache.org/viewvc?rev=669666&view=rev
Log:
PIG-162 Shravan's changes to add combiner and switch to binary comparator in map reduce sorting, plus misc other bug fixes (newChanges2.patch).


Added:
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigCombiner.java
Modified:
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/IsEmpty.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/TOKENIZE.java
    incubator/pig/branches/types/src/org/apache/pig/data/AmendableTuple.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java
    incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java
    incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java
    incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java
    incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java
    incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java
    incubator/pig/branches/types/src/org/apache/pig/data/TargetedTuple.java
    incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java
    incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/Result.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Add.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/BinaryExpressionOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Divide.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/EqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GTOrEqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/GreaterThanExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LTOrEqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/LessThanExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Mod.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Multiply.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/NotEqualToExpr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POAnd.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POBinCond.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POCast.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POIsNull.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POMapLookUp.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONegative.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/PONot.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POOr.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POProject.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserComparisonFunc.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/POUserFunc.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/expressionOperators/Subtract.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/PODistinct.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POFilter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POGlobalRearrange.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POLocalRearrange.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POPackage.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POUnion.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestNull.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Arithmetic.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/BinCond.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Cogroup.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Comparison.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/ComplexForeach.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Distinct.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Generate.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/IsNull1.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/IsNull2.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC1.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC10.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC11.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC12.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC13.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC14.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC2.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC3.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC4.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC5.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC6.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC7.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC8.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC9.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Sort.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Split1.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Split2.gld
    incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/Union.gld

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Jun 19 12:56:00 2008
@@ -252,8 +252,11 @@
             }
 
             MapReduceLauncher launcher = new MapReduceLauncher();
-            launcher.launchPig(plan, jobName, pigContext);
-            return new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec);
+            boolean success = launcher.launchPig(plan, jobName, pigContext);
+            if(success)
+                return new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec);
+            else
+                return new HJob(ExecJob.JOB_STATUS.FAILED, pigContext, null);
 
         } catch (Exception e) {
             // There are a lot of exceptions thrown by the launcher.  If this

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Thu Jun 19 12:56:00 2008
@@ -137,14 +137,18 @@
                     pigContext).toString(),
                     BinStorage.class.getName());
                 str.setSFile(spec);
+                plan.addAsLeaf(str);
             }
             else{
                 spec = ((POStore)leaf).getSFile();
             }
 
             LocalLauncher launcher = new LocalLauncher();
-            launcher.launchPig(plan, jobName, pigContext);
-            return new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec);
+            boolean success = launcher.launchPig(plan, jobName, pigContext);
+            if(success)
+                return new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec);
+            else
+                return new HJob(ExecJob.JOB_STATUS.FAILED, pigContext, null);
         } catch (Exception e) {
             // There are a lot of exceptions thrown by the launcher.  If this
             // is an ExecException, just let it through.  Else wrap it.

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/AVG.java Thu Jun 19 12:56:00 2008
@@ -51,7 +51,7 @@
             return new Double(avg);
         } catch (ExecException ee) {
             IOException oughtToBeEE = new IOException();
-            ee.initCause(ee);
+            oughtToBeEE.initCause(ee);
             throw oughtToBeEE;
         }
     }
@@ -80,7 +80,7 @@
                 throw new RuntimeException(t.getMessage() + ": " + input);
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
-                ee.initCause(ee);
+                oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }
                 
@@ -95,7 +95,7 @@
                 return combine(b);
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
-                ee.initCause(ee);
+                oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }
         }
@@ -118,7 +118,7 @@
                 return new Double(avg);
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
-                ee.initCause(ee);
+                oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }
         }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java Thu Jun 19 12:56:00 2008
@@ -82,7 +82,7 @@
             return (Tuple)DataReaderWriter.readDatum(inData);
         } catch (ExecException ee) {
             IOException oughtToBeEE = new IOException();
-            ee.initCause(ee);
+            oughtToBeEE.initCause(ee);
             throw oughtToBeEE;
         }
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java Thu Jun 19 12:56:00 2008
@@ -42,7 +42,7 @@
             return count(input);
         } catch (ExecException ee) {
             IOException oughtToBeEE = new IOException();
-            ee.initCause(ee);
+            oughtToBeEE.initCause(ee);
             throw oughtToBeEE;
         }
     }
@@ -68,7 +68,7 @@
                 return tfact.newTuple(count(input));
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
-                ee.initCause(ee);
+                oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }
         }
@@ -83,7 +83,7 @@
                 return tfact.newTuple(count(input));
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
-                ee.initCause(ee);
+                oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }
         }
@@ -96,7 +96,7 @@
                 return sum(input);
             } catch (Exception ee) {
                 IOException oughtToBeEE = new IOException();
-                ee.initCause(ee);
+                oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }
         }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java Thu Jun 19 12:56:00 2008
@@ -69,7 +69,7 @@
             return output;
         } catch (ExecException ee) {
             IOException oughtToBeEE = new IOException();
-            ee.initCause(ee);
+            oughtToBeEE.initCause(ee);
             throw oughtToBeEE;
         }
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/IsEmpty.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/IsEmpty.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/IsEmpty.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/IsEmpty.java Thu Jun 19 12:56:00 2008
@@ -42,7 +42,7 @@
                     DataType.findTypeName(values) + " for emptiness.");
         } catch (ExecException ee) {
             IOException oughtToBeEE = new IOException();
-            ee.initCause(ee);
+            oughtToBeEE.initCause(ee);
             throw oughtToBeEE;
         }
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java Thu Jun 19 12:56:00 2008
@@ -42,7 +42,7 @@
             return max(input);
         } catch (ExecException ee) {
             IOException oughtToBeEE = new IOException();
-            ee.initCause(ee);
+            oughtToBeEE.initCause(ee);
             throw oughtToBeEE;
         }
     }
@@ -68,7 +68,7 @@
                 return tfact.newTuple(max(input));
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
-                ee.initCause(ee);
+                oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }
         }
@@ -80,7 +80,7 @@
                 return max(input);
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
-                ee.initCause(ee);
+                oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }
         }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java Thu Jun 19 12:56:00 2008
@@ -42,7 +42,7 @@
             return min(input);
         } catch (ExecException ee) {
             IOException oughtToBeEE = new IOException();
-            ee.initCause(ee);
+            oughtToBeEE.initCause(ee);
             throw oughtToBeEE;
         }
     }
@@ -68,7 +68,7 @@
                 return tfact.newTuple(min(input));
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
-                ee.initCause(ee);
+                oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }
         }
@@ -80,7 +80,7 @@
                 return min(input);
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
-                ee.initCause(ee);
+                oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }
         }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/SUM.java Thu Jun 19 12:56:00 2008
@@ -41,7 +41,7 @@
             return sum(input);
         } catch (ExecException ee) {
             IOException oughtToBeEE = new IOException();
-            ee.initCause(ee);
+            oughtToBeEE.initCause(ee);
             throw oughtToBeEE;
         }
     }
@@ -67,7 +67,7 @@
                 return tfact.newTuple(sum(input));
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
-                ee.initCause(ee);
+                oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }
         }
@@ -79,7 +79,7 @@
                 return sum(input);
             } catch (ExecException ee) {
                 IOException oughtToBeEE = new IOException();
-                ee.initCause(ee);
+                oughtToBeEE.initCause(ee);
                 throw oughtToBeEE;
             }
         }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/TOKENIZE.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/TOKENIZE.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/TOKENIZE.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/TOKENIZE.java Thu Jun 19 12:56:00 2008
@@ -46,7 +46,7 @@
             return output;
         } catch (ExecException ee) {
             IOException oughtToBeEE = new IOException();
-            ee.initCause(ee);
+            oughtToBeEE.initCause(ee);
             throw oughtToBeEE;
         }
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/data/AmendableTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/AmendableTuple.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/AmendableTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/AmendableTuple.java Thu Jun 19 12:56:00 2008
@@ -18,6 +18,10 @@
 package org.apache.pig.data;
 
 public class AmendableTuple extends DefaultTuple {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 2L;
     Object amendKey;       // the identifier of the group to which this tuple belongs.
 
     public AmendableTuple(int numFields, Object amendKey) {

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java Thu Jun 19 12:56:00 2008
@@ -24,6 +24,7 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.ArrayList;
@@ -66,7 +67,7 @@
  * DataBag come in several types, default, sorted, and distinct.  The type
  * must be chosen up front, there is no way to convert a bag on the fly.
  */
-public interface DataBag extends Spillable, WritableComparable, Iterable<Tuple> {
+public interface DataBag extends Spillable, WritableComparable, Iterable<Tuple>, Serializable {
     /**
      * Get the number of elements in the bag, both in memory and on disk.
      */

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java Thu Jun 19 12:56:00 2008
@@ -38,6 +38,11 @@
  */
 public class DefaultDataBag extends DefaultAbstractBag {
 
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 2L;
+
     private static TupleFactory gTupleFactory = TupleFactory.getInstance();
 
     private final Log log = LogFactory.getLog(getClass());

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultTuple.java Thu Jun 19 12:56:00 2008
@@ -36,6 +36,10 @@
  * DefaultTupleFactory.
  */
 public class DefaultTuple implements Tuple {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 2L;
     protected List<Object> mFields;
 
     /**

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java Thu Jun 19 12:56:00 2008
@@ -51,6 +51,11 @@
  */
 public class DistinctDataBag extends DefaultAbstractBag {
 
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 2L;
+
     private final Log log = LogFactory.getLog(getClass());
 
     private static TupleFactory gTupleFactory = TupleFactory.getInstance();

Modified: incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java Thu Jun 19 12:56:00 2008
@@ -26,6 +26,11 @@
  */
 public class IndexedTuple extends DefaultTuple {
 
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 2L;
+    
     public int index = -1;
     
     public IndexedTuple() {

Modified: incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java Thu Jun 19 12:56:00 2008
@@ -46,7 +46,13 @@
  * We allow a user defined comparator, but provide a default comparator in
  * cases where the user doesn't specify one.
  */
-public class SortedDataBag extends DefaultAbstractBag {
+public class SortedDataBag extends DefaultAbstractBag{
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 2L;
+
     private static TupleFactory gTupleFactory = TupleFactory.getInstance();
 
     private final Log log = LogFactory.getLog(getClass());

Modified: incubator/pig/branches/types/src/org/apache/pig/data/TargetedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/TargetedTuple.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/TargetedTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/TargetedTuple.java Thu Jun 19 12:56:00 2008
@@ -32,6 +32,11 @@
  *
  */
 public class TargetedTuple implements Tuple {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 2L;
+    
     private Tuple t;
     // The list of operators to which this tuple
     // has to be attached as input.

Modified: incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java Thu Jun 19 12:56:00 2008
@@ -26,6 +26,10 @@
 
 public class TimestampedTuple extends DefaultTuple {
 
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 2L;
     private final Log log = LogFactory.getLog(getClass());
     static String defaultDelimiter = "[,\t]";
 

Modified: incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java Thu Jun 19 12:56:00 2008
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.data;
 
+import java.io.Serializable;
 import java.util.List;
 
 import org.apache.hadoop.io.WritableComparable;
@@ -35,7 +36,7 @@
  *
  * Fields are numbered from 0.
  */
-public interface Tuple extends WritableComparable {
+public interface Tuple extends WritableComparable, Serializable {
     /**
      * Make this tuple reference the contents of another.  This method does not copy
      * the underlying data.   It maintains references to the data from the original

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java Thu Jun 19 12:56:00 2008
@@ -28,6 +28,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.EvalFunc;
+import org.apache.pig.LoadFunc;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
@@ -37,6 +38,7 @@
 import org.apache.pig.impl.physicalLayer.relationalOperators.*;
 import org.apache.pig.impl.physicalLayer.expressionOperators.*;
 import org.apache.pig.impl.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.physicalLayer.expressionOperators.BinaryExpressionOperator;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -45,168 +47,208 @@
 import org.apache.pig.impl.plan.VisitorException;
 
 public class LogToPhyTranslationVisitor extends LOVisitor {
-    
- 
+
     Map<LogicalOperator, PhysicalOperator> LogToPhyMap;
 
     Random r = new Random();
 
     Stack<PhysicalPlan<? extends PhysicalOperator>> currentPlans;
+
     PhysicalPlan currentPlan;
 
     NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
-    
+
     private Log log = LogFactory.getLog(getClass());
+
     PigContext pc;
-    
+
+    LoadFunc load;
+
     public LogToPhyTranslationVisitor(LogicalPlan plan) {
-        super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
+        super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(
+                plan));
 
         currentPlans = new Stack<PhysicalPlan<? extends PhysicalOperator>>();
         currentPlan = new PhysicalPlan<PhysicalOperator>();
         LogToPhyMap = new HashMap<LogicalOperator, PhysicalOperator>();
     }
-    
+
     public void setPigContext(PigContext pc) {
         this.pc = pc;
     }
-    
+
     public PhysicalPlan<PhysicalOperator> getPhysicalPlan() {
 
         return currentPlan;
     }
-    
+
     @Override
     public void visit(LOGreaterThan op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator exprOp = new GreaterThanExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-        exprOp.setResultType(op.getLhsOperand().getType());
+        BinaryComparisonOperator exprOp = new GreaterThanExpr(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)), op
+                .getRequestedParallelism());
+        exprOp.setOperandType(op.getLhsOperand().getType());
+        exprOp.setLhs((ExpressionOperator) LogToPhyMap.get(op.getLhsOperand()));
+        exprOp.setRhs((ExpressionOperator) LogToPhyMap.get(op.getRhsOperand()));
         LogicalPlan lp = op.mPlan;
-        
+
         currentPlan.add(exprOp);
         LogToPhyMap.put(op, exprOp);
-        
+
         List<LogicalOperator> predecessors = lp.getPredecessors(op);
-        
-        if(predecessors == null) return;
-        for(LogicalOperator lo : predecessors) {
+
+        if (predecessors == null)
+            return;
+        for (LogicalOperator lo : predecessors) {
             PhysicalOperator from = LogToPhyMap.get(lo);
             try {
-                //currentExprPlan.connect(from, exprOp);
+                // currentExprPlan.connect(from, exprOp);
                 currentPlan.connect(from, exprOp);
             } catch (PlanException e) {
-                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
             }
         }
     }
-    
+
     @Override
     public void visit(LOLesserThan op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator exprOp = new LessThanExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-        exprOp.setResultType(op.getLhsOperand().getType());
+        BinaryComparisonOperator exprOp = new LessThanExpr(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)), op
+                .getRequestedParallelism());
+        exprOp.setOperandType(op.getLhsOperand().getType());
+        exprOp.setLhs((ExpressionOperator) LogToPhyMap.get(op.getLhsOperand()));
+        exprOp.setRhs((ExpressionOperator) LogToPhyMap.get(op.getRhsOperand()));
         LogicalPlan lp = op.mPlan;
-        
+
         currentPlan.add(exprOp);
         LogToPhyMap.put(op, exprOp);
-        
+
         List<LogicalOperator> predecessors = lp.getPredecessors(op);
-        
-        if(predecessors == null) return;
-        for(LogicalOperator lo : predecessors) {
+
+        if (predecessors == null)
+            return;
+        for (LogicalOperator lo : predecessors) {
             PhysicalOperator from = LogToPhyMap.get(lo);
             try {
                 currentPlan.connect(from, exprOp);
             } catch (PlanException e) {
-                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
             }
         }
     }
-    
+
     @Override
     public void visit(LOGreaterThanEqual op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator exprOp = new GTOrEqualToExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-        exprOp.setResultType(op.getLhsOperand().getType());
+        BinaryComparisonOperator exprOp = new GTOrEqualToExpr(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)), op
+                .getRequestedParallelism());
+        exprOp.setOperandType(op.getLhsOperand().getType());
+        exprOp.setLhs((ExpressionOperator) LogToPhyMap.get(op.getLhsOperand()));
+        exprOp.setRhs((ExpressionOperator) LogToPhyMap.get(op.getRhsOperand()));
         LogicalPlan lp = op.mPlan;
-        
+
         currentPlan.add(exprOp);
         LogToPhyMap.put(op, exprOp);
-        
+
         List<LogicalOperator> predecessors = lp.getPredecessors(op);
-        if(predecessors == null) return;
-        for(LogicalOperator lo : predecessors) {
+        if (predecessors == null)
+            return;
+        for (LogicalOperator lo : predecessors) {
             PhysicalOperator from = LogToPhyMap.get(lo);
             try {
                 currentPlan.connect(from, exprOp);
             } catch (PlanException e) {
-                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
             }
         }
     }
-    
+
     @Override
     public void visit(LOLesserThanEqual op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator exprOp = new LTOrEqualToExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-        exprOp.setResultType(op.getLhsOperand().getType());
+        BinaryComparisonOperator exprOp = new LTOrEqualToExpr(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)), op
+                .getRequestedParallelism());
+        exprOp.setOperandType(op.getLhsOperand().getType());
+        exprOp.setLhs((ExpressionOperator) LogToPhyMap.get(op.getLhsOperand()));
+        exprOp.setRhs((ExpressionOperator) LogToPhyMap.get(op.getRhsOperand()));
         LogicalPlan lp = op.mPlan;
-        
+
         currentPlan.add(exprOp);
         LogToPhyMap.put(op, exprOp);
-        
+
         List<LogicalOperator> predecessors = lp.getPredecessors(op);
-        if(predecessors == null) return;
-        for(LogicalOperator lo : predecessors) {
+        if (predecessors == null)
+            return;
+        for (LogicalOperator lo : predecessors) {
             PhysicalOperator from = LogToPhyMap.get(lo);
             try {
                 currentPlan.connect(from, exprOp);
             } catch (PlanException e) {
-                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
             }
         }
     }
-    
+
     @Override
     public void visit(LOEqual op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator exprOp = new EqualToExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-        exprOp.setResultType(op.getLhsOperand().getType());
+        BinaryComparisonOperator exprOp = new EqualToExpr(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)), op
+                .getRequestedParallelism());
+        exprOp.setOperandType(op.getLhsOperand().getType());
+        exprOp.setLhs((ExpressionOperator) LogToPhyMap.get(op.getLhsOperand()));
+        exprOp.setRhs((ExpressionOperator) LogToPhyMap.get(op.getRhsOperand()));
         LogicalPlan lp = op.mPlan;
-        
+
         currentPlan.add(exprOp);
         LogToPhyMap.put(op, exprOp);
-        
+
         List<LogicalOperator> predecessors = lp.getPredecessors(op);
-        if(predecessors == null) return;
-        for(LogicalOperator lo : predecessors) {
+        if (predecessors == null)
+            return;
+        for (LogicalOperator lo : predecessors) {
             PhysicalOperator from = LogToPhyMap.get(lo);
             try {
                 currentPlan.connect(from, exprOp);
             } catch (PlanException e) {
-                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
             }
         }
     }
-    
+
     @Override
     public void visit(LONotEqual op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator exprOp = new NotEqualToExpr(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
-        exprOp.setResultType(op.getLhsOperand().getType());
+        BinaryComparisonOperator exprOp = new NotEqualToExpr(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)), op
+                .getRequestedParallelism());
+        exprOp.setOperandType(op.getLhsOperand().getType());
+        exprOp.setLhs((ExpressionOperator) LogToPhyMap.get(op.getLhsOperand()));
+        exprOp.setRhs((ExpressionOperator) LogToPhyMap.get(op.getRhsOperand()));
         LogicalPlan lp = op.mPlan;
-        
+
         currentPlan.add(exprOp);
         LogToPhyMap.put(op, exprOp);
-        
+
         List<LogicalOperator> predecessors = lp.getPredecessors(op);
-        if(predecessors == null) return;
-        for(LogicalOperator lo : predecessors) {
+        if (predecessors == null)
+            return;
+        for (LogicalOperator lo : predecessors) {
             PhysicalOperator from = LogToPhyMap.get(lo);
             try {
                 currentPlan.connect(from, exprOp);
             } catch (PlanException e) {
-                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
             }
         }
     }
@@ -214,135 +256,163 @@
     @Override
     public void visit(LORegexp op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator exprOp = new PORegexp(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        ExpressionOperator exprOp = new PORegexp(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), op.getRequestedParallelism());
         exprOp.setResultType(op.getLhsOperand().getType());
         LogicalPlan lp = op.mPlan;
-        
+
         currentPlan.add(exprOp);
         LogToPhyMap.put(op, exprOp);
-        
+
         List<LogicalOperator> predecessors = lp.getPredecessors(op);
-        if(predecessors == null) return;
-        for(LogicalOperator lo : predecessors) {
+        if (predecessors == null)
+            return;
+        for (LogicalOperator lo : predecessors) {
             PhysicalOperator from = LogToPhyMap.get(lo);
             try {
                 currentPlan.connect(from, exprOp);
             } catch (PlanException e) {
-                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
             }
         }
     }
-    
+
     @Override
     public void visit(LOAdd op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator exprOp = new Add(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        BinaryExpressionOperator exprOp = new Add(new OperatorKey(scope,
+                nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
         exprOp.setResultType(op.getType());
+        exprOp.setLhs((ExpressionOperator) LogToPhyMap.get(op.getLhsOperand()));
+        exprOp.setRhs((ExpressionOperator) LogToPhyMap.get(op.getRhsOperand()));
         LogicalPlan lp = op.mPlan;
-        
+
         currentPlan.add(exprOp);
         LogToPhyMap.put(op, exprOp);
-        
+
         List<LogicalOperator> predecessors = lp.getPredecessors(op);
-        if(predecessors == null) return;
-        for(LogicalOperator lo : predecessors) {
+        if (predecessors == null)
+            return;
+        for (LogicalOperator lo : predecessors) {
             PhysicalOperator from = LogToPhyMap.get(lo);
             try {
                 currentPlan.connect(from, exprOp);
             } catch (PlanException e) {
-                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
             }
         }
     }
-    
+
     @Override
     public void visit(LOSubtract op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator exprOp = new Subtract(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        BinaryExpressionOperator exprOp = new Subtract(new OperatorKey(scope,
+                nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
         exprOp.setResultType(op.getType());
+        exprOp.setLhs((ExpressionOperator) LogToPhyMap.get(op.getLhsOperand()));
+        exprOp.setRhs((ExpressionOperator) LogToPhyMap.get(op.getRhsOperand()));
         LogicalPlan lp = op.mPlan;
-        
+
         currentPlan.add(exprOp);
         LogToPhyMap.put(op, exprOp);
-        
+
         List<LogicalOperator> predecessors = lp.getPredecessors(op);
-        if(predecessors == null) return;
-        for(LogicalOperator lo : predecessors) {
+        if (predecessors == null)
+            return;
+        for (LogicalOperator lo : predecessors) {
             PhysicalOperator from = LogToPhyMap.get(lo);
             try {
                 currentPlan.connect(from, exprOp);
             } catch (PlanException e) {
-                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
             }
         }
     }
-    
+
     @Override
     public void visit(LOMultiply op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator exprOp = new Multiply(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        BinaryExpressionOperator exprOp = new Multiply(new OperatorKey(scope,
+                nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
         exprOp.setResultType(op.getType());
+        exprOp.setLhs((ExpressionOperator) LogToPhyMap.get(op.getLhsOperand()));
+        exprOp.setRhs((ExpressionOperator) LogToPhyMap.get(op.getRhsOperand()));
         LogicalPlan lp = op.mPlan;
-        
+
         currentPlan.add(exprOp);
         LogToPhyMap.put(op, exprOp);
-        
+
         List<LogicalOperator> predecessors = lp.getPredecessors(op);
-        if(predecessors == null) return;
-        for(LogicalOperator lo : predecessors) {
+        if (predecessors == null)
+            return;
+        for (LogicalOperator lo : predecessors) {
             PhysicalOperator from = LogToPhyMap.get(lo);
             try {
                 currentPlan.connect(from, exprOp);
             } catch (PlanException e) {
-                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
             }
         }
     }
-    
+
     @Override
     public void visit(LODivide op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator exprOp = new Divide(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        BinaryExpressionOperator exprOp = new Divide(new OperatorKey(scope,
+                nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
         exprOp.setResultType(op.getType());
+        exprOp.setLhs((ExpressionOperator) LogToPhyMap.get(op.getLhsOperand()));
+        exprOp.setRhs((ExpressionOperator) LogToPhyMap.get(op.getRhsOperand()));
         LogicalPlan lp = op.mPlan;
-        
+
         currentPlan.add(exprOp);
         LogToPhyMap.put(op, exprOp);
-        
+
         List<LogicalOperator> predecessors = lp.getPredecessors(op);
-        if(predecessors == null) return;
-        for(LogicalOperator lo : predecessors) {
+        if (predecessors == null)
+            return;
+        for (LogicalOperator lo : predecessors) {
             PhysicalOperator from = LogToPhyMap.get(lo);
             try {
                 currentPlan.connect(from, exprOp);
             } catch (PlanException e) {
-                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
             }
         }
     }
-    
+
     @Override
     public void visit(LOMod op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator exprOp = new Mod(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        BinaryExpressionOperator exprOp = new Mod(new OperatorKey(scope,
+                nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
         exprOp.setResultType(op.getType());
+        exprOp.setLhs((ExpressionOperator) LogToPhyMap.get(op.getLhsOperand()));
+        exprOp.setRhs((ExpressionOperator) LogToPhyMap.get(op.getRhsOperand()));
         LogicalPlan lp = op.mPlan;
-        
+
         currentPlan.add(exprOp);
         LogToPhyMap.put(op, exprOp);
-        
+
         List<LogicalOperator> predecessors = lp.getPredecessors(op);
-        if(predecessors == null) return;
-        for(LogicalOperator lo : predecessors) {
+        if (predecessors == null)
+            return;
+        for (LogicalOperator lo : predecessors) {
             PhysicalOperator from = LogToPhyMap.get(lo);
             try {
                 currentPlan.connect(from, exprOp);
             } catch (PlanException e) {
-                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
             }
         }
     }
-
+    
     @Override
     public void visit(LOAnd op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
@@ -406,45 +476,53 @@
             log.error("Invalid physical operators in the physical plan" + e.getMessage());
         }
     }
-    
+
     @Override
     public void visit(LOCogroup cg) throws VisitorException {
         boolean currentPhysicalPlan = false;
         String scope = cg.getOperatorKey().scope;
         List<LogicalOperator> inputs = cg.getInputs();
-        
-        POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
-        POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
-        
+
+        POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)), cg
+                .getRequestedParallelism());
+        POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), cg.getRequestedParallelism());
+
         currentPlan.add(poGlobal);
         currentPlan.add(poPackage);
-        
+
         try {
             currentPlan.connect(poGlobal, poPackage);
         } catch (PlanException e1) {
-            log.error("Invalid physical operators in the physical plan" + e1.getMessage());
+            log.error("Invalid physical operators in the physical plan"
+                    + e1.getMessage());
         }
-        
+
         int count = 0;
         Byte type = null;
-        for(LogicalOperator op : inputs) {
-            List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans().get(op);
-            POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
+        for (LogicalOperator op : inputs) {
+            List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans()
+                    .get(op);
+            POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+                    scope, nodeGen.getNextNodeId(scope)), cg
+                    .getRequestedParallelism());
             List<ExprPlan> exprPlans = new ArrayList<ExprPlan>();
             currentPlans.push(currentPlan);
-            for(LogicalPlan lp : plans) {
+            for (LogicalPlan lp : plans) {
                 currentPlan = new ExprPlan();
-                PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(lp);
+                PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+                        .spawnChildWalker(lp);
                 pushWalker(childWalker);
                 mCurrentWalker.walk(this);
                 exprPlans.add((ExprPlan) currentPlan);
                 popWalker();
-                
+
             }
             currentPlan = currentPlans.pop();
             physOp.setPlans(exprPlans);
             physOp.setIndex(count++);
-            if(plans.size() > 1) {
+            if (plans.size() > 1) {
                 type = DataType.TUPLE;
                 physOp.setKeyType(type);
             } else {
@@ -452,16 +530,17 @@
                 physOp.setKeyType(type);
             }
             physOp.setResultType(DataType.TUPLE);
-            
+
             currentPlan.add(physOp);
-            
+
             try {
                 currentPlan.connect(LogToPhyMap.get(op), physOp);
                 currentPlan.connect(physOp, poGlobal);
             } catch (PlanException e) {
-                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
             }
-            
+
         }
         poPackage.setKeyType(type);
         poPackage.setResultType(DataType.TUPLE);
@@ -473,55 +552,61 @@
     @Override
     public void visit(LOFilter filter) throws VisitorException {
         String scope = filter.getOperatorKey().scope;
-        POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), filter.getRequestedParallelism());
+        POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), filter.getRequestedParallelism());
         poFilter.setResultType(filter.getType());
         currentPlan.add(poFilter);
         LogToPhyMap.put(filter, poFilter);
         currentPlans.push(currentPlan);
-        
+
         currentPlan = new ExprPlan();
-        
-        PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(filter.getComparisonPlan());
+
+        PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+                .spawnChildWalker(filter.getComparisonPlan());
         pushWalker(childWalker);
         mCurrentWalker.walk(this);
         popWalker();
-        
+
         poFilter.setPlan((ExprPlan) currentPlan);
         currentPlan = currentPlans.pop();
-        
+
         List<LogicalOperator> op = filter.getPlan().getPredecessors(filter);
-        
+
         PhysicalOperator from = LogToPhyMap.get(op.get(0));
         try {
             currentPlan.connect(from, poFilter);
         } catch (PlanException e) {
-            log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            log.error("Invalid physical operators in the physical plan"
+                    + e.getMessage());
         }
     }
-    
+
     @Override
     public void visit(LOProject op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        POProject exprOp = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        POProject exprOp = new POProject(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), op.getRequestedParallelism());
         exprOp.setResultType(op.getType());
         exprOp.setColumn(op.getCol());
         exprOp.setStar(op.isStar());
         LogicalPlan lp = op.mPlan;
         LogToPhyMap.put(op, exprOp);
         currentPlan.add(exprOp);
-        
+
         List<LogicalOperator> predecessors = lp.getPredecessors(op);
-        
-        //Project might not have any predecessors
-        if(predecessors == null) return;
-        
-        for(LogicalOperator lo : predecessors) {
+
+        // Project might not have any predecessors
+        if (predecessors == null)
+            return;
+
+        for (LogicalOperator lo : predecessors) {
             PhysicalOperator from = LogToPhyMap.get(lo);
             try {
                 currentPlan.connect(from, exprOp);
             } catch (PlanException e) {
-                
-                log.error("Invalid physical operators in the physical plan" + e.getMessage());
+
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
             }
         }
     }
@@ -529,37 +614,41 @@
     @Override
     public void visit(LOForEach forEach) throws VisitorException {
         String scope = forEach.getOperatorKey().scope;
-        //This needs to be handled specially.
-        //We need to be able to handle arbitrary levels of nesting
-        
-        //push the current physical plan in the stack.
+        // This needs to be handled specially.
+        // We need to be able to handle arbitrary levels of nesting
+
+        // push the current physical plan in the stack.
         currentPlans.push(currentPlan);
-        
-        //create a new physical plan
+
+        // create a new physical plan
         currentPlan = new PhysicalPlan<PhysicalOperator>();
-        PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(forEach.getForEachPlan());
-        
-        //now populate the physical plan by walking
+        PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+                .spawnChildWalker(forEach.getForEachPlan());
+
+        // now populate the physical plan by walking
         pushWalker(childWalker);
         mCurrentWalker.walk(this);
         popWalker();
-        
-        POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), forEach.getRequestedParallelism());
+
+        POForEach fe = new POForEach(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), forEach.getRequestedParallelism());
         fe.setPlan(currentPlan);
         fe.setResultType(DataType.TUPLE);
         LogToPhyMap.put(forEach, fe);
-        
-        //now connect foreach to its inputs
+
+        // now connect foreach to its inputs
         currentPlan = currentPlans.pop();
         currentPlan.add(fe);
-        PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(mPlan.getPredecessors(forEach).get(0));
+        PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(mPlan
+                .getPredecessors(forEach).get(0));
         try {
             currentPlan.connect(from, fe);
         } catch (PlanException e) {
-            log.error("Invalid physical operators in the physical plan" + e.getMessage());
-        
+            log.error("Invalid physical operators in the physical plan"
+                    + e.getMessage());
+
         }
-        
+
     }
 
     @Override
@@ -568,38 +657,43 @@
         String scope = g.getOperatorKey().scope;
         List<ExprPlan> exprPlans = new ArrayList<ExprPlan>();
         List<LogicalPlan> plans = g.getGeneratePlans();
-        
+
         currentPlans.push(currentPlan);
-        for(LogicalPlan plan : plans) {
+        for (LogicalPlan plan : plans) {
             currentPlan = new ExprPlan();
-            PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(plan);
+            PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+                    .spawnChildWalker(plan);
             pushWalker(childWalker);
             childWalker.walk(this);
             exprPlans.add((ExprPlan) currentPlan);
             popWalker();
         }
         currentPlan = currentPlans.pop();
-        
-        //PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
-        PhysicalOperator poGen = new POGenerate(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), g.getRequestedParallelism(), exprPlans, g.getFlatten());
+
+        // PhysicalOperator poGen = new POGenerate(new OperatorKey("",
+        // r.nextLong()), inputs, toBeFlattened);
+        PhysicalOperator poGen = new POGenerate(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), g.getRequestedParallelism(), exprPlans,
+                g.getFlatten());
         poGen.setResultType(DataType.TUPLE);
         LogToPhyMap.put(g, poGen);
         currentPlan.add(poGen);
-        
-        //generate cannot have multiple inputs
+
+        // generate cannot have multiple inputs
         List<LogicalOperator> op = g.getPlan().getPredecessors(g);
-        
-        //generate may not have any predecessors
-        if(op == null)
+
+        // generate may not have any predecessors
+        if (op == null)
             return;
-        
+
         PhysicalOperator from = LogToPhyMap.get(op.get(0));
         try {
             currentPlan.connect(from, poGen);
         } catch (PlanException e) {
-            log.error("Invalid physical operators in the physical plan" + e.getMessage());
+            log.error("Invalid physical operators in the physical plan"
+                    + e.getMessage());
         }
-        
+
     }
 
     @Override
@@ -607,130 +701,108 @@
         String scope = s.getOperatorKey().scope;
         List<LogicalPlan> logPlans = s.getSortColPlans();
         List<ExprPlan> sortPlans = new ArrayList<ExprPlan>(logPlans.size());
-        
-        //convert all the logical expression plans to physical expression plans
+
+        // convert all the logical expression plans to physical expression plans
         currentPlans.push(currentPlan);
-        for(LogicalPlan plan : logPlans) {
+        for (LogicalPlan plan : logPlans) {
             currentPlan = new ExprPlan();
-            PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(plan);
+            PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+                    .spawnChildWalker(plan);
             pushWalker(childWalker);
             childWalker.walk(this);
             sortPlans.add((ExprPlan) currentPlan);
             popWalker();
         }
         currentPlan = currentPlans.pop();
-        
-        //get the physical operator for sort
+
+        // get the physical operator for sort
         POSort sort;
-        if(s.getUserFunc() == null) { 
-            sort = new POSort(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), s.getRequestedParallelism(), null, sortPlans, s.getAscendingCols(), null);
+        if (s.getUserFunc() == null) {
+            sort = new POSort(new OperatorKey(scope, nodeGen
+                    .getNextNodeId(scope)), s.getRequestedParallelism(), null,
+                    sortPlans, s.getAscendingCols(), null);
         } else {
-            POUserFunc comparator = new POUserComparisonFunc(
-                    new OperatorKey(scope, nodeGen.getNextNodeId(scope)), s.getRequestedParallelism(), null, s.getUserFunc());
-            sort = new POSort(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), s.getRequestedParallelism(), null,
+            POUserFunc comparator = new POUserComparisonFunc(new OperatorKey(
+                    scope, nodeGen.getNextNodeId(scope)), s
+                    .getRequestedParallelism(), null, s.getUserFunc());
+            sort = new POSort(new OperatorKey(scope, nodeGen
+                    .getNextNodeId(scope)), s.getRequestedParallelism(), null,
                     sortPlans, s.getAscendingCols(), comparator);
         }
-        sort.setRequestedParallelism(s.getType());
+        // sort.setRequestedParallelism(s.getType());
         LogToPhyMap.put(s, sort);
         currentPlan.add(sort);
-        PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(s.mPlan.getPredecessors(s).get(0));
+        PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(s.mPlan
+                .getPredecessors(s).get(0));
         try {
             currentPlan.connect(from, sort);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
         }
-        
+
         sort.setResultType(s.getType());
-        
+
     }
-    
+
     @Override
     public void visit(LODistinct op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        //This is simpler. No plans associated with this. Just create the physical operator,
-        //push it in the current plan and make the connections
-        PhysicalOperator physOp = new PODistinct(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        // This is simpler. No plans associated with this. Just create the
+        // physical operator,
+        // push it in the current plan and make the connections
+        PhysicalOperator physOp = new PODistinct(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), op.getRequestedParallelism());
         physOp.setResultType(op.getType());
         LogToPhyMap.put(op, physOp);
         currentPlan.add(physOp);
-        //Distinct will only have a single input
-        PhysicalOperator from = LogToPhyMap.get(op.mPlan.getPredecessors(op).get(0));
+        // Distinct will only have a single input
+        PhysicalOperator from = LogToPhyMap.get(op.mPlan.getPredecessors(op)
+                .get(0));
         try {
             currentPlan.connect(from, physOp);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
         }
     }
-    
-/*    public void visit(LOSplit split) throws VisitorException {
-        String scope = split.getKey().scope;
-        PhysicalOperator physOp = new POSplit(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), split.getRequestedParallelism());
-        LogToPhyMap.put(split, physOp);
-        
-        currentPlan.add(physOp);
-        PhysicalOperator from = LogToPhyMap.get(split.getPlan().getPredecessors(split).get(0));
-        try {
-            currentPlan.connect(from, physOp);
-        } catch (PlanException e) {
-            log.error("Invalid physical operator in the plan" + e.getMessage());
-        }
-        
-        Collection<LogicalPlan> plans = split.getConditionPlans();
-        
-        for(LogicalPlan plan : plans) {
-            currentPlans.push(currentPlan);
-            currentPlan = new ExprPlan();
-            PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(plan);
-            pushWalker(childWalker);
-            mCurrentWalker.walk(this);
-            popWalker();
-            PhysicalOperator filter = new POFilter(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), split.getRequestedParallelism());
-            ((POFilter) filter).setPlan((ExprPlan) currentPlan);
-            currentPlan = currentPlans.pop();
-            currentPlan.add(filter);
-            try {
-                currentPlan.connect(physOp, filter);
-            } catch (PlanException e) {
-                log.error("Invalid physical operator in the plan" + e.getMessage());
-            }
-        }
-        
-        
-    }*/
-    
+
     @Override
     public void visit(LOSplit split) throws VisitorException {
         String scope = split.getOperatorKey().scope;
-        PhysicalOperator physOp = new POSplit(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), split.getRequestedParallelism());
+        PhysicalOperator physOp = new POSplit(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), split.getRequestedParallelism());
         LogToPhyMap.put(split, physOp);
-        
+
         currentPlan.add(physOp);
-        PhysicalOperator from = LogToPhyMap.get(split.getPlan().getPredecessors(split).get(0));
+        PhysicalOperator from = LogToPhyMap.get(split.getPlan()
+                .getPredecessors(split).get(0));
         try {
             currentPlan.connect(from, physOp);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
         }
     }
-    
+
     @Override
     public void visit(LOSplitOutput split) throws VisitorException {
         String scope = split.getOperatorKey().scope;
-        PhysicalOperator physOp = new POFilter(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), split.getRequestedParallelism());
+        PhysicalOperator physOp = new POFilter(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), split.getRequestedParallelism());
         LogToPhyMap.put(split, physOp);
-        
+
         currentPlan.add(physOp);
         currentPlans.push(currentPlan);
         currentPlan = new ExprPlan();
-        PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker.spawnChildWalker(split.getConditionPlan());
+        PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+                .spawnChildWalker(split.getConditionPlan());
         pushWalker(childWalker);
         mCurrentWalker.walk(this);
         popWalker();
-        
+
         ((POFilter) physOp).setPlan((ExprPlan) currentPlan);
         currentPlan = currentPlans.pop();
         currentPlan.add(physOp);
-        PhysicalOperator from = LogToPhyMap.get(split.getPlan().getPredecessors(split).get(0));
+        PhysicalOperator from = LogToPhyMap.get(split.getPlan()
+                .getPredecessors(split).get(0));
         try {
             currentPlan.connect(from, physOp);
         } catch (PlanException e) {
@@ -743,50 +815,56 @@
         String scope = func.getOperatorKey().scope;
         Object f = PigContext.instantiateFuncFromSpec(func.getFuncSpec());
         PhysicalOperator p;
-        if(f instanceof EvalFunc) { 
-            p = new POUserFunc(new OperatorKey(scope,
-                nodeGen.getNextNodeId(scope)), func.getRequestedParallelism(),
-                null, func.getFuncSpec(), (EvalFunc)f);
+        if (f instanceof EvalFunc) {
+            p = new POUserFunc(new OperatorKey(scope, nodeGen
+                    .getNextNodeId(scope)), func.getRequestedParallelism(),
+                    null, func.getFuncSpec(), (EvalFunc) f);
         } else {
-            p = new POUserComparisonFunc(new OperatorKey(scope,
-                nodeGen.getNextNodeId(scope)), func.getRequestedParallelism(),
-                null, func.getFuncSpec(), (ComparisonFunc)f);
+            p = new POUserComparisonFunc(new OperatorKey(scope, nodeGen
+                    .getNextNodeId(scope)), func.getRequestedParallelism(),
+                    null, func.getFuncSpec(), (ComparisonFunc) f);
         }
         p.setResultType(func.getType());
         currentPlan.add(p);
         List<LogicalOperator> fromList = func.getPlan().getPredecessors(func);
-        for(LogicalOperator op : fromList) {
+        for (LogicalOperator op : fromList) {
             PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(op);
             try {
                 currentPlan.connect(from, p);
             } catch (PlanException e) {
-                log.error("Invalid physical operator in the plan" + e.getMessage());
-            }    
+                log.error("Invalid physical operator in the plan"
+                        + e.getMessage());
+            }
         }
         LogToPhyMap.put(func, p);
-        
+
     }
-    
+
     @Override
     public void visit(LOLoad loLoad) throws VisitorException {
         String scope = loLoad.getOperatorKey().scope;
-        //This would be a root operator. We don't need to worry about finding its predecessors
-        POLoad load = new POLoad(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+        // This would be a root operator. We don't need to worry about finding
+        // its predecessors
+        POLoad load = new POLoad(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)));
         load.setLFile(loLoad.getInputFile());
         load.setPc(pc);
         load.setResultType(loLoad.getType());
         currentPlan.add(load);
         LogToPhyMap.put(loLoad, load);
+        this.load = loLoad.getLoadFunc();
     }
-    
+
     @Override
     public void visit(LOStore loStore) throws VisitorException {
         String scope = loStore.getOperatorKey().scope;
-        POStore store = new POStore(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+        POStore store = new POStore(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)));
         store.setSFile(loStore.getOutputFile());
         store.setPc(pc);
         currentPlan.add(store);
-        PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(loStore.getPlan().getPredecessors(loStore).get(0));
+        PhysicalOperator<PhyPlanVisitor> from = LogToPhyMap.get(loStore
+                .getPlan().getPredecessors(loStore).get(0));
         try {
             currentPlan.connect(from, store);
         } catch (PlanException e) {
@@ -794,134 +872,151 @@
         }
         LogToPhyMap.put(loStore, store);
     }
-    
+
     @Override
     public void visit(LOConst op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ConstantExpression ce = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+        ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,
+                nodeGen.getNextNodeId(scope)));
         ce.setValue(op.getValue());
         ce.setResultType(op.getType());
         //this operator doesn't have any predecessors
         currentPlan.add(ce);
         LogToPhyMap.put(op, ce);
     }
-    
+
     @Override
     public void visit(LOBinCond op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator physOp = new POBinCond(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        ExpressionOperator physOp = new POBinCond(new OperatorKey(scope,
+                nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
         LogToPhyMap.put(op, physOp);
-        POBinCond phy = (POBinCond)physOp;
-        ExpressionOperator cond = (ExpressionOperator) LogToPhyMap.get(op.getCond());
+        POBinCond phy = (POBinCond) physOp;
+        ExpressionOperator cond = (ExpressionOperator) LogToPhyMap.get(op
+                .getCond());
         phy.setCond(cond);
-        ExpressionOperator lhs = (ExpressionOperator) LogToPhyMap.get(op.getLhsOp());
+        ExpressionOperator lhs = (ExpressionOperator) LogToPhyMap.get(op
+                .getLhsOp());
         phy.setLhs(lhs);
-        ExpressionOperator rhs = (ExpressionOperator) LogToPhyMap.get(op.getRhsOp());
+        ExpressionOperator rhs = (ExpressionOperator) LogToPhyMap.get(op
+                .getRhsOp());
         phy.setRhs(rhs);
-        
+        phy.setResultType(op.getType());
         currentPlan.add(physOp);
-        
+
         List<LogicalOperator> ops = op.getPlan().getPredecessors(op);
-        
-        for(LogicalOperator l : ops) {
+
+        for (LogicalOperator l : ops) {
             ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(l);
             try {
                 currentPlan.connect(from, physOp);
             } catch (PlanException e) {
-                log.error("Invalid physical operator in the plan" + e.getMessage());
+                log.error("Invalid physical operator in the plan"
+                        + e.getMessage());
             }
         }
-        
+
     }
-    
+
     @Override
     public void visit(LONegative op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator physOp = new PONegative(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism(), null);
+        ExpressionOperator physOp = new PONegative(new OperatorKey(scope,
+                nodeGen.getNextNodeId(scope)), op.getRequestedParallelism(),
+                null);
         currentPlan.add(physOp);
-        
+
         LogToPhyMap.put(op, physOp);
-        ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op.getPlan().getPredecessors(op).get(0));
-        ((PONegative)physOp).setInput(from);
+        ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op
+                .getPlan().getPredecessors(op).get(0));
+        ((PONegative) physOp).setExpr(from);
         try {
             currentPlan.connect(from, physOp);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
         }
-        
+
     }
-    
+
     @Override
     public void visit(LOIsNull op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator physOp = new POIsNull(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism(), null);
+        ExpressionOperator physOp = new POIsNull(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), op.getRequestedParallelism(), null);
         currentPlan.add(physOp);
-        
+
         LogToPhyMap.put(op, physOp);
-        ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op.getPlan().getPredecessors(op).get(0));
-        ((POIsNull)physOp).setInput(from);
+        ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op
+                .getPlan().getPredecessors(op).get(0));
+        ((POIsNull) physOp).setExpr(from);
         try {
             currentPlan.connect(from, physOp);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
         }
-        
+
     }
-    
+
     @Override
     public void visit(LOMapLookup op) throws VisitorException {
-        String scope = ((OperatorKey)op.getOperatorKey()).scope;
-        ExpressionOperator physOp = new POMapLookUp(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism(), op.getLookUpKey());
+        String scope = ((OperatorKey) op.getOperatorKey()).scope;
+        ExpressionOperator physOp = new POMapLookUp(new OperatorKey(scope,
+                nodeGen.getNextNodeId(scope)), op.getRequestedParallelism(), op
+                .getLookUpKey());
         physOp.setResultType(op.getType());
         currentPlan.add(physOp);
-        
+
         LogToPhyMap.put(op, physOp);
-        
-        ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op.getMap());
+
+        ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op
+                .getMap());
         try {
             currentPlan.connect(from, physOp);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
         }
-        
+
     }
-    
+
     @Override
     public void visit(LOCast op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        ExpressionOperator physOp = new POCast(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        ExpressionOperator physOp = new POCast(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), op.getRequestedParallelism());
         currentPlan.add(physOp);
-        
+
         LogToPhyMap.put(op, physOp);
-        ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op.getExpression());
+        ExpressionOperator from = (ExpressionOperator) LogToPhyMap.get(op
+                .getExpression());
         physOp.setResultType(op.getType());
-        
+        ((POCast) physOp).setLoadFSpec(load.getClass().getName());
         try {
             currentPlan.connect(from, physOp);
         } catch (PlanException e) {
             log.error("Invalid physical operator in the plan" + e.getMessage());
         }
-        
+
     }
-    
+
     @Override
     public void visit(LOUnion op) throws VisitorException {
         String scope = op.getOperatorKey().scope;
-        POUnion physOp = new POUnion(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), op.getRequestedParallelism());
+        POUnion physOp = new POUnion(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), op.getRequestedParallelism());
         currentPlan.add(physOp);
         physOp.setResultType(op.getType());
         LogToPhyMap.put(op, physOp);
         List<LogicalOperator> ops = op.getInputs();
-        
-        for(LogicalOperator l : ops) {
+
+        for (LogicalOperator l : ops) {
             PhysicalOperator from = LogToPhyMap.get(l);
             try {
                 currentPlan.connect(from, physOp);
             } catch (PlanException e) {
-                log.error("Invalid physical operator in the plan" + e.getMessage());
+                log.error("Invalid physical operator in the plan"
+                        + e.getMessage());
             }
         }
     }
-    
-    
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Jun 19 12:56:00 2008
@@ -1666,7 +1666,6 @@
 	LOOKAHEAD(BaseEvalSpec(over,specs,lp,input)) expr = BaseEvalSpec(over,specs,lp,input)
 |	( "(" expr = InfixExpr(over,specs,lp,input) ")" )
 |	expr = NegativeExpr(over,specs,lp,input)
-
 	)
 	{log.trace("Exiting UnaryExpr");return expr;}
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Thu Jun 19 12:56:00 2008
@@ -958,7 +958,7 @@
         }
 
     }
-
+    
     @Override
     public void visit(LONot uniOp) throws VisitorException {
         byte type = uniOp.getOperand().getType() ;

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java?rev=669666&r1=669665&r2=669666&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java Thu Jun 19 12:56:00 2008
@@ -27,13 +27,14 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -238,6 +239,13 @@
             else{
                 //Map Reduce Job
                 //Process the POPackage operator and remove it from the reduce plan
+                if(!mro.combinePlan.isEmpty()){
+                    POPackage combPack = (POPackage)mro.combinePlan.getRoots().get(0);
+                    mro.combinePlan.remove(combPack);
+                    jobConf.setCombinerClass(PigCombiner.Combine.class);
+                    jobConf.set("pig.combinePlan", ObjectSerializer.serialize(mro.combinePlan));
+                    jobConf.set("pig.combine.package", ObjectSerializer.serialize(combPack));
+                }
                 POPackage pack = (POPackage)mro.reducePlan.getRoots().get(0);
                 mro.reducePlan.remove(pack);
                 jobConf.setMapperClass(PigMapReduce.Map.class);
@@ -246,7 +254,11 @@
                 jobConf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
                 jobConf.set("pig.reducePlan", ObjectSerializer.serialize(mro.reducePlan));
                 jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
-                jobConf.setOutputKeyClass(DataType.getWritableComparableTypes(pack.getKeyType()).getClass());
+                Class<? extends WritableComparable> keyClass = DataType.getWritableComparableTypes(pack.getKeyType()).getClass();
+                jobConf.setOutputKeyClass(keyClass);
+                if(keyClass.equals(TupleFactory.getInstance().tupleClass())){
+                    jobConf.setOutputKeyComparatorClass(PigWritableComparator.class);
+                }
                 jobConf.setOutputValueClass(IndexedTuple.class);
             }
             
@@ -269,4 +281,14 @@
         }
         return ret;
     }
+    
+    public static class PigWritableComparator extends WritableComparator {
+        public PigWritableComparator() {
+            super(TupleFactory.getInstance().tupleClass());
+        }
+
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
+            return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+        }
+    }
 }



Mime
View raw message