pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pradeep...@apache.org
Subject svn commit: r897283 [4/5] - in /hadoop/pig/branches/load-store-redesign: ./ contrib/piggybank/java/ contrib/zebra/ contrib/zebra/src/java/org/apache/hadoop/zebra/pig/ contrib/zebra/src/java/org/apache/hadoop/zebra/types/ contrib/zebra/src/test/e2e/merg...
Date Fri, 08 Jan 2010 18:17:12 GMT
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultAbstractBag.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultAbstractBag.java Fri Jan  8 18:17:07 2010
@@ -28,9 +28,11 @@
 import java.util.Iterator;
 import java.util.ArrayList;
 
+import org.apache.pig.PigCounters;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
 import org.apache.pig.impl.util.BagFormat;
@@ -371,7 +373,14 @@
     		log.warn(msg, e);
     	}    	
     }
-
+    
+    protected void incSpillCount(Enum counter) {
+        // Increment the spill count
+        // warn is a misnomer. The function updates the counter. If the update
+        // fails, it dumps a warning
+        PigHadoopLogger.getInstance().warn(this, "Spill counter incremented", counter);
+    }
+    
     public static abstract class BagDelimiterTuple extends DefaultTuple{}
     public static class StartBag extends BagDelimiterTuple{
         private static final long serialVersionUID = 1L;}

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultDataBag.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultDataBag.java Fri Jan  8 18:17:07 2010
@@ -30,6 +30,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
 import org.apache.pig.PigWarning;
 
 
@@ -122,6 +123,8 @@
             }
             mContents.clear();
         }
+        // Increment the spill count
+        incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
         return spilled;
     }
 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java Fri Jan  8 18:17:07 2010
@@ -36,7 +36,9 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
 import org.apache.pig.PigWarning;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 
 
 
@@ -182,6 +184,8 @@
             }
             mContents.clear();
         }
+        // Increment the spill count
+        incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
         return spilled;
     }
 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalCachedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalCachedBag.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalCachedBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalCachedBag.java Fri Jan  8 18:17:07 2010
@@ -22,6 +22,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 
 
@@ -99,8 +100,14 @@
                 		log.debug("Memory can hold "+ mContents.size() + " records, put the rest in spill file.");
                 	}
                     out = getSpillFile();
+
                 }
                 t.write(out);
+                
+                if (cacheLimit!= 0 && mContents.size() % cacheLimit == 0) {
+                    /* Increment the spill count*/
+                    incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT);                    
+                }
             }
             catch(IOException e) {
                 throw new RuntimeException(e);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalDistinctBag.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalDistinctBag.java Fri Jan  8 18:17:07 2010
@@ -37,6 +37,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
 import org.apache.pig.PigWarning;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 
@@ -228,7 +229,9 @@
         mContents.clear();
         mMemSizeChanged = true;
         memUsage = 0;
-                
+
+        // Increment the spill count
+        incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT);
         return spilled;
     }
 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java Fri Jan  8 18:17:07 2010
@@ -38,6 +38,7 @@
   
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
 import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -238,6 +239,8 @@
         mMemSizeChanged = true;
         memUsage = 0;
         
+        // Increment the spill count
+        incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT);
         return spilled;
     }
 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java Fri Jan  8 18:17:07 2010
@@ -35,6 +35,7 @@
   
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigCounters;
 import org.apache.pig.PigWarning;
 
 
@@ -155,6 +156,8 @@
             }
             mContents.clear();
         }
+        // Increment the spill count
+        incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
         return spilled;
     }
 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/JsonMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/JsonMetadata.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/JsonMetadata.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/JsonMetadata.java Fri Jan  8 18:17:07 2010
@@ -34,9 +34,6 @@
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.backend.hadoop.datastorage.HDirectory;
 import org.apache.pig.backend.hadoop.datastorage.HFile;
-import org.apache.pig.backend.local.datastorage.LocalDataStorage;
-import org.apache.pig.backend.local.datastorage.LocalDir;
-import org.apache.pig.backend.local.datastorage.LocalFile;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.Operator;
@@ -115,11 +112,6 @@
                     Path parent = descriptorPath.getParent();
                     parentName = parent.toString();
                     parentContainer = new HDirectory((HDataStorage)storage,parent);
-                } else if (descriptor instanceof LocalFile) {
-                    File descriptorPath = ((LocalFile) descriptor).getPath();
-                    fileName = descriptorPath.getName();
-                    parentName = descriptorPath.getParent();
-                    parentContainer = new LocalDir((LocalDataStorage)storage,parentName);
                 }
                 ElementDescriptor metaFilePath = storage.asElement(parentName, prefix+"."+fileName);
 

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/PigContext.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/PigContext.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/PigContext.java Fri Jan  8 18:17:07 2010
@@ -50,6 +50,7 @@
 import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import org.apache.pig.backend.hadoop.streaming.HadoopExecutableManager;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.impl.streaming.ExecutableManager;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/ColumnPruner.java Fri Jan  8 18:17:07 2010
@@ -23,13 +23,10 @@
 import java.util.Map;
 
 import org.apache.pig.PigException;
-import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.ProjectionMap.Column;
-import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.logicalLayer.RelationalOperator;
 
@@ -37,14 +34,20 @@
     private Map<LogicalOperator, List<Pair<Integer,Integer>>> prunedColumnsMap;
     LogicalPlan plan;
     
-    public ColumnPruner(LogicalPlan plan, LogicalOperator op, List<Pair<Integer, Integer>> prunedColumns, 
-            PlanWalker<LogicalOperator, LogicalPlan> walker) {
-        super(plan, walker);
+    public ColumnPruner(LogicalPlan plan) {
+        super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
         prunedColumnsMap = new HashMap<LogicalOperator, List<Pair<Integer,Integer>>>();
-        prunedColumnsMap.put(op, prunedColumns);
         this.plan = plan;
     }
 
+    public void addPruneMap(LogicalOperator op, List<Pair<Integer,Integer>> prunedColumns) {
+        prunedColumnsMap.put(op, prunedColumns);
+    }
+    
+    public boolean isEmpty() {
+        return prunedColumnsMap.isEmpty();
+    }
+    
     protected void prune(RelationalOperator lOp) throws VisitorException {
         List<LogicalOperator> predecessors = plan.getPredecessors(lOp);
         if (predecessors==null)
@@ -79,7 +82,7 @@
             }
             
             // For every input column, check if it is pruned
-            nextOutput:for (int i=0;i<lOp.getSchema().size();i++)
+            for (int i=0;i<lOp.getSchema().size();i++)
             {
                 List<RequiredFields> relevantFieldsList = lOp.getRelevantInputs(0, i);
                 
@@ -101,125 +104,73 @@
                 if (needNoInputs)
                     continue;
                 
-                boolean allPruned = true;
+                boolean columnPruned = false;
                 
-                // For LOUnion, we treat it differently. LOUnion is the only operator that cannot be pruned independently.
-                // For every pruned input column, we will prune. LOUnion (Contrary to other operators, unless all relevant
-                // fields are pruned, we then prune the output field. Inside LOUnion, we have a counter, the output columns 
-                // is actually pruned only after all corresponding input columns have been pruned
-                if (lOp instanceof LOUnion)
-                {
-                    allPruned = false;
-                    checkAllPrunedUnion: for (RequiredFields relevantFields: relevantFieldsList)
-                    {
-                        for (Pair<Integer, Integer> relevantField: relevantFields.getFields())
-                        {
-                            if (columnsPruned.contains(relevantField))
-                            {
-                                allPruned = true;
-                                break checkAllPrunedUnion;
-                            }
-                        }
-                    }
-                }
                 // For LOCogroup, one output can be pruned if all its relevant input are pruned except for "key" fields 
-                else if (lOp instanceof LOCogroup)
+                if (lOp instanceof LOCogroup)
                 {
                     List<RequiredFields> requiredFieldsList = lOp.getRequiredFields();
-                    boolean sawInputPruned = false;
                     for (Pair<Integer, Integer> column : columnsPruned)
                     {
                         if (column.first == i-1)  // Saw at least one input pruned
                         {
-                            sawInputPruned = true;
-                            // Further check if requiredFields of the LOCogroup contains these fields.
-                            // If not, we can safely prune this output column
                             if (requiredFieldsList.get(i-1).getFields().contains(column))
                             {
-                                allPruned = false;
+                                columnPruned = true;
                                 break;
                             }
                         }
                     }
-                    if (!sawInputPruned)
-                        allPruned = false;
                 }
                 else
                 {
-                    nextRelevantFields:for (RequiredFields relevantFields: relevantFieldsList)
+                    // If we see any of the relevant field of this column get pruned, 
+                    // then we prune this column for this operator
+                    for (RequiredFields relevantFields: relevantFieldsList)
                     {
-                        if (relevantFields==null)
+                        if (relevantFields == null)
                             continue;
-                        
-                        if (relevantFields.needAllFields())
-                        {
-                            allPruned = false;
+                        if (relevantFields.getNeedAllFields())
                             break;
-                        }
-                        if (relevantFields.needNoFields())
-                            continue;
                         for (Pair<Integer, Integer> relevantField: relevantFields.getFields())
                         {
-                            if (relevantField==null)
-                                continue;
-                            
-                            if (lOp instanceof LOUnion)
+                            if (columnsPruned.contains(relevantField))
                             {
-                                if (columnsPruned.contains(relevantField))
-                                    break nextRelevantFields;
+                                columnPruned = true;
                             }
-                            else if (!columnsPruned.contains(relevantField))
-                            {
-                                allPruned = false;
-                                break nextRelevantFields;
+                            else {
+                                // For union, inconsistent pruning is possible (See PIG-1146)
+                                // We shall allow inconsistent pruning for union, and the pruneColumns method
+                                // in LOUnion will handle this inconsistency
+                                if (!(lOp instanceof LOUnion) && columnPruned==true) {
+                                    int errCode = 2185;
+                                    String msg = "Column $"+i+" of "+lOp+" inconsistent pruning";
+                                    throw new OptimizerException(msg, errCode, PigException.BUG);
+                                }
                             }
                         }
                     }
                 }
-                if (allPruned)
+                if (columnPruned)
                     columnsToPrune.add(new Pair<Integer, Integer>(0, i));
             }
     
-            if (columnsPruned.size()!=0)
+            LogicalOperator currentOp = lOp;
+            
+            // If it is LOCogroup, insert foreach to mimic pruning, because we have no way to prune
+            // LOCogroup output only by pruning the inputs
+            if (columnsPruned.size()!=0 && lOp instanceof LOCogroup)
             {
-                MultiMap<Integer, Column> mappedFields = new MultiMap<Integer, Column>();
-                List<Column> columns = new ArrayList<Column>();
-                columns.add(new Column(new Pair<Integer, Integer>(0, 0)));
-                mappedFields.put(0, columns);
-                LogicalOperator nextOp = lOp;
-                if (lOp instanceof LOCogroup)
-                {
-                    ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
-                    ArrayList<LogicalPlan> generatingPlans = new ArrayList<LogicalPlan>();
-                    String scope = lOp.getOperatorKey().scope;
-                    for (int i=0;i<=predecessors.size();i++) {
-                        if (!columnsToPrune.contains(new Pair<Integer, Integer>(0, i)))
-                        {
-                            LogicalPlan projectPlan = new LogicalPlan();
-                            LogicalOperator projectInput = lOp;
-                            ExpressionOperator column = new LOProject(projectPlan, new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), projectInput, i);
-                            flattenList.add(false);
-                            projectPlan.add(column);
-                            generatingPlans.add(projectPlan);
-                        }
-                        columns = new ArrayList<Column>();
-                        columns.add(new Column(new Pair<Integer, Integer>(0, i+1)));
-                        mappedFields.put(i+1, columns);
-                    }
-                    LOForEach forEach = new LOForEach(mPlan, new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), generatingPlans, flattenList);
-                    LogicalOperator succ = mPlan.getSuccessors(lOp).get(0);
-                    mPlan.add(forEach);
-                    // Since the successor has not been pruned yet, so we cannot rewire directly because
-                    // rewire has the assumption that predecessor and successor is in consistent
-                    // state. The way we do the rewire is kind of hacky. We give a fake projection map in the 
-                    // new node to fool rewire
-                    mPlan.doInsertBetween(lOp, forEach, succ, false);
-                    forEach.getProjectionMap().setMappedFields(mappedFields);
-                    succ.rewire(lOp, 0, forEach, false);
-                    nextOp = forEach;
-                }
-                if (lOp.pruneColumns(columnsPruned))
-                    prunedColumnsMap.put(nextOp, columnsToPrune);
+                List<Integer> columnsToProject = new ArrayList<Integer>();
+                for (int i=0;i<=predecessors.size();i++) {
+                    if (!columnsToPrune.contains(new Pair<Integer, Integer>(0, i)))
+                        columnsToProject.add(i);
+                }                
+                currentOp = lOp.insertPlainForEachAfter(columnsToProject);
+            }
+            
+            if (!columnsPruned.isEmpty()&&lOp.pruneColumns(columnsPruned)) {
+                prunedColumnsMap.put(currentOp, columnsToPrune);
             }
         } catch (FrontendException e) {
             int errCode = 2188;
@@ -244,7 +195,11 @@
     }
     
     protected void visit(LOForEach foreach) throws VisitorException {
-        prune(foreach);
+        // The only case we should skip foreach is when this is the foreach
+        // inserted after LOLoad to mimic pruning, then we put the prunedColumns entry
+        // for that foreach, and we do not need to further visit this foreach here
+        if (!prunedColumnsMap.containsKey(foreach))
+            prune(foreach);
     }
     
     protected void visit(LOJoin join) throws VisitorException {

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOForEach.java Fri Jan  8 18:17:07 2010
@@ -795,11 +795,8 @@
         return new Pair<Boolean, List<Integer>>(hasFlatten, flattenedColumns);
     }
     
-    public LogicalPlan getRelevantPlan(int output, int column)
+    public LogicalPlan getRelevantPlan(int column)
     {
-        if (output!=0)
-            return null;
-
         if (column<0)
             return null;
 
@@ -814,6 +811,22 @@
         return mSchemaPlanMapping.get(column);
     }
     
+    public boolean isInputFlattened(int column) throws FrontendException {
+        LogicalPlan plan = getRelevantPlan(column);
+        if (plan==null) {
+            int errCode = 2195;
+            throw new FrontendException("Fail to get foreach plan for input column "+column,
+                    errCode, PigException.BUG);
+        }
+        int index = mForEachPlans.indexOf(plan);
+        if (index==-1) {
+            int errCode = 2195;
+            throw new FrontendException("Fail to get foreach plan for input column "+column,
+                    errCode, PigException.BUG);
+        }
+        return mFlatten.get(index);
+    }
+    
     @Override
     public List<RequiredFields> getRelevantInputs(int output, int column) throws FrontendException {
         if (!mIsSchemaComputed)
@@ -835,7 +848,7 @@
             return null;
         }
         
-        LogicalPlan plan = getRelevantPlan(output, column);
+        LogicalPlan plan = getRelevantPlan(column);
         
         TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(
                 plan);
@@ -863,7 +876,7 @@
         
         return result;
     }
-     @Override
+    @Override
     public boolean pruneColumns(List<Pair<Integer, Integer>> columns)
             throws FrontendException {
         if (!mIsSchemaComputed)
@@ -946,7 +959,7 @@
             int index = planToRemove.get(planToRemove.size()-1);
             if (mUserDefinedSchema!=null) {
                 for (int i=mUserDefinedSchema.size()-1;i>=0;i--) {
-                    if (getRelevantPlan(0, i)==mForEachPlans.get(index))
+                    if (getRelevantPlan(i)==mForEachPlans.get(index))
                         mUserDefinedSchema.remove(i);
                 }
             }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/LOUnion.java Fri Jan  8 18:17:07 2010
@@ -42,8 +42,6 @@
     private static final long serialVersionUID = 2L;
     private static Log log = LogFactory.getLog(LOUnion.class);
     
-    List<Pair<Integer, Integer>> stagingPrunedColumns = new ArrayList<Pair<Integer, Integer>>(); 
-
     /**
      * @param plan
      *            Logical plan this operator is a part of.
@@ -240,27 +238,60 @@
             result.add(new RequiredFields(inputList));
         }
         
-        
         return result;
     }
-
+    @Override
     public boolean pruneColumns(List<Pair<Integer, Integer>> columns)
         throws FrontendException {
-        stagingPrunedColumns.addAll(columns);
-        boolean allPruned = true;
+        if (!mIsSchemaComputed)
+            getSchema();
+        if (mSchema == null) {
+            log.warn("Cannot prune columns in union, no schema information found");
+            return false;
+        }
+
+        // Find maximum pruning among all inputs
+        boolean[] maximumPruned = new boolean[mSchema.size()];
         for (Pair<Integer, Integer>pair : columns)
         {
-            for (int i=0;i<mPlan.getPredecessors(this).size();i++)
+            maximumPruned[pair.second] = true;
+        }
+        int maximumNumPruned = 0;
+        for (int i=0;i<maximumPruned.length;i++) {
+            if (maximumPruned[i])
+                maximumNumPruned++;
+        }
+        
+        List<LogicalOperator> preds = getInputs();
+        for (int i=0;i<preds.size();i++) {
+            // Build a list of pruned columns for this predecessor
+            boolean[] actualPruned = new boolean[mSchema.size()];
+            for (Pair<Integer, Integer>pair : columns)
             {
-                if (!stagingPrunedColumns.contains(new Pair<Integer, Integer>(i, pair.second)))
-                    allPruned = false;
+                if (pair.first==i)
+                    actualPruned[pair.second] = true;
+            }
+            int actualNumPruned = 0;
+            for (int j=0;j<actualPruned.length;j++) {
+                if (actualPruned[j])
+                    actualNumPruned++;
+            }
+            if (actualNumPruned!=maximumNumPruned) { // We need to prune some columns before LOUnion
+                List<Integer> columnsToProject = new ArrayList<Integer>();
+                int index=0;
+                for (int j=0;j<actualPruned.length;j++) {
+                    if (!maximumPruned[j]) {
+                        columnsToProject.add(index); 
+                        index++;
+                    } else {
+                        if (!actualPruned[j])
+                            index++;
+                    }
+                }
+                ((RelationalOperator)preds.get(i)).insertPlainForEachAfter(columnsToProject);
             }
         }
-        if (allPruned)
-        {
-            super.pruneColumns(columns);
-            return true;
-        }
-        return false;
+        super.pruneColumns(columns);
+        return true;
     }
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/RelationalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/RelationalOperator.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/RelationalOperator.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/RelationalOperator.java Fri Jan  8 18:17:07 2010
@@ -17,13 +17,17 @@
  */
 package org.apache.pig.impl.logicalLayer;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.pig.PigException;
+import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.ProjectionMap.Column;
+import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
 
 public abstract class RelationalOperator extends LogicalOperator {
@@ -177,4 +181,33 @@
             }
         }
     }
+    
+    // insert a forEach after the operator. This forEach map columns in columnsToProject directly, and remove the rest
+    public LogicalOperator insertPlainForEachAfter(List<Integer> columnsToProject) throws FrontendException {
+        ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
+        ArrayList<LogicalPlan> generatePlans = new ArrayList<LogicalPlan>();
+        String scope = getOperatorKey().scope;
+        for (int pos : columnsToProject) {
+            LogicalPlan projectPlan = new LogicalPlan();
+            ExpressionOperator column = new LOProject(projectPlan, new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), this, pos);
+            flattenList.add(false);
+            projectPlan.add(column);
+            generatePlans.add(projectPlan);
+        }
+        LOForEach forEach = new LOForEach(mPlan, new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), generatePlans, flattenList);
+        LogicalOperator succ = mPlan.getSuccessors(this).get(0);
+
+        MultiMap<Integer, Column> mappedFields = new MultiMap<Integer, Column>();
+        List<Column> columns;
+        for (int i=0;i<=getSchema().size();i++) {
+            columns = new ArrayList<Column>();
+            columns.add(new Column(new Pair<Integer, Integer>(0, i)));
+            mappedFields.put(i, columns);
+        }
+        mPlan.add(forEach);
+        mPlan.doInsertBetween(this, forEach, succ, false);
+        forEach.getProjectionMap().setMappedFields(mappedFields);
+        succ.rewire(this, 0, forEach, false);
+        return forEach;
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java Fri Jan  8 18:17:07 2010
@@ -237,6 +237,7 @@
                         pruneRule.getTransformer().transform(match);
                     }
                 }
+                ((PruneColumns)pruneRule.getTransformer()).prune();
             }
         }
     }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java Fri Jan  8 18:17:07 2010
@@ -58,7 +58,6 @@
 import org.apache.pig.impl.logicalLayer.RelationalOperator;
 import org.apache.pig.impl.logicalLayer.TopLevelProjectFinder;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.MapKeysInfo;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -79,12 +78,14 @@
 }
 
 public class PruneColumns extends LogicalTransformer {
-
+    private boolean safeToPrune = true;
     private static Log log = LogFactory.getLog(PruneColumns.class);
     Map<RelationalOperator, RequiredInfo> cachedRequiredInfo = new HashMap<RelationalOperator, RequiredInfo>();
-    
+    private Map<LOLoad, RequiredFields> prunedLoaderColumnsMap = new HashMap<LOLoad, RequiredFields>();
+    ColumnPruner pruner;
     public PruneColumns(LogicalPlan plan) {
         super(plan);
+        pruner = new ColumnPruner(plan);
     }
 
     @Override
@@ -180,6 +181,8 @@
     {
         try
         {
+            if (!safeToPrune)
+                return;
             if (!(lo instanceof RelationalOperator))
             {
                 int errCode = 2182;
@@ -188,6 +191,7 @@
             }
             if (lo.getSchema()==null)
             {
+                safeToPrune = false;
                 return;
             }
             RelationalOperator rlo = (RelationalOperator)lo;
@@ -200,7 +204,7 @@
             {
                 // LOLoad has only one output
                 RequiredFields loaderRequiredFields = requiredOutputInfo.requiredFieldsList.get(0);
-                pruneLoader((LOLoad)rlo, loaderRequiredFields);
+                prunedLoaderColumnsMap.put((LOLoad)rlo, loaderRequiredFields);
                 return;
             }
             
@@ -297,7 +301,7 @@
                     else if (rlo instanceof LOForEach)
                     {
                         // Relay map keys from output to input
-                        LogicalPlan forEachPlan = ((LOForEach)rlo).getRelevantPlan(requiredOutputField.first, requiredOutputField.second);
+                        LogicalPlan forEachPlan = ((LOForEach)rlo).getRelevantPlan(requiredOutputField.second);
                         if (relevantFields.getFields()!=null && relevantFields.getFields().size()!=0)
                         {
                             int index = ((LOForEach)rlo).getForEachPlans().indexOf(forEachPlan);
@@ -515,7 +519,7 @@
             	processNode(predecessors.get(i), new RequiredInfo(newRequiredOutputFieldsList));
             }
         } catch (FrontendException e) {
-            int errCode = 2185;
+            int errCode = 2211;
             String msg = "Unable to prune columns when processing node " + lo;
             throw new OptimizerException(msg, errCode, PigException.BUG, e);
         }
@@ -705,42 +709,14 @@
         }
         
         // Loader does not support column pruning, insert foreach
-        LOForEach forEach = null;
+        LogicalOperator forEach = null;
         if (response==null || !response.getRequiredFieldResponse())
         {
-            Set<Integer> columnsToProject = new TreeSet<Integer>();
+            List<Integer> columnsToProject = new ArrayList<Integer>();
             for (RequiredField rf : requiredFieldList.getFields())
                 columnsToProject.add(rf.getIndex());
             
-            ArrayList<Boolean> flattenList = new ArrayList<Boolean>();
-            ArrayList<LogicalPlan> generatePlans = new ArrayList<LogicalPlan>();
-            String scope = load.getOperatorKey().scope;
-            for (int pos : columnsToProject) {
-                LogicalPlan projectPlan = new LogicalPlan();
-                LogicalOperator projectInput = load;
-                ExpressionOperator column = new LOProject(projectPlan, new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), projectInput, pos);
-                flattenList.add(false);
-                projectPlan.add(column);
-                generatePlans.add(projectPlan);
-            }
-            forEach = new LOForEach(mPlan, new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), generatePlans, flattenList);
-            LogicalOperator pred = mPlan.getSuccessors(load).get(0);
-            /*mPlan.disconnect(load, pred);
-            mPlan.add(forEach);
-            mPlan.connect(load, forEach);
-            mPlan.connect(forEach, pred);
-            forEach.getSchema();*/
-            MultiMap<Integer, Column> mappedFields = new MultiMap<Integer, Column>();
-            List<Column> columns;
-            for (int i=0;i<=load.getSchema().size();i++) {
-                columns = new ArrayList<Column>();
-                columns.add(new Column(new Pair<Integer, Integer>(0, i)));
-                mappedFields.put(i, columns);
-            }
-            mPlan.add(forEach);
-            mPlan.doInsertBetween(load, forEach, pred, false);
-            forEach.getProjectionMap().setMappedFields(mappedFields);
-            pred.rewire(load, 0, forEach, false);
+            forEach = load.insertPlainForEachAfter(columnsToProject);
         }
         
         // Begin to prune
@@ -757,16 +733,10 @@
         StringBuffer message = new StringBuffer();
         if (pruneList.size()!=0)
         {
-            
-            ColumnPruner columnPruner;
             if (forEach == null)
-                columnPruner = new ColumnPruner(mPlan, load, pruneList, 
-                    new DependencyOrderWalker<LogicalOperator, LogicalPlan>(mPlan));
+                pruner.addPruneMap(load, pruneList);
             else
-                columnPruner = new ColumnPruner(mPlan, forEach, pruneList, 
-                        new DependencyOrderWalker<LogicalOperator, LogicalPlan>(mPlan));
-            
-            columnPruner.visit();
+                pruner.addPruneMap(forEach, pruneList);
 
             message.append("Columns pruned for " + load.getAlias() + ": ");
             for (int i=0;i<pruneList.size();i++)
@@ -805,4 +775,22 @@
         else
             log.info("No map keys pruned for " + load.getAlias());
     }
+    
+    public void prune() throws OptimizerException {
+        try {
+            if (!safeToPrune)
+                return;
+            
+            for (LOLoad load : prunedLoaderColumnsMap.keySet())
+                pruneLoader(load, prunedLoaderColumnsMap.get(load));
+            
+            if (!pruner.isEmpty())
+                pruner.visit();
+        }
+        catch (FrontendException e) {
+            int errCode = 2212;
+            String msg = "Unable to prune plan";
+            throw new OptimizerException(msg, errCode, PigException.BUG, e);
+        }
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java Fri Jan  8 18:17:07 2010
@@ -19,6 +19,7 @@
 package org.apache.pig.impl.logicalLayer.optimizer;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -41,6 +42,7 @@
 import org.apache.pig.impl.plan.ProjectionMap;
 import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.OperatorPlan.IndexHelper;
+import org.apache.pig.impl.plan.ProjectionMap.Column;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
 import org.apache.pig.PigException;
 import org.apache.pig.impl.util.MultiMap;
@@ -283,6 +285,23 @@
                     }
                 }
                 
+                // Check if flattened fields is required by LOJoin, if so, don't optimize
+                if (successor instanceof LOJoin) {
+                    List<RequiredFields> requiredFieldsList = ((LOJoin)successor).getRequiredFields();
+                    RequiredFields requiredFields = requiredFieldsList.get(foreachPosition.intValue());
+                    
+                    MultiMap<Integer, Column> foreachMappedFields = foreachProjectionMap.getMappedFields();
+                    
+                    for (Pair<Integer, Integer> pair : requiredFields.getFields()) {
+                        Collection<Column> columns = foreachMappedFields.get(pair.second);
+                        for (Column column : columns) {
+                            Pair<Integer, Integer> foreachInputColumn = column.getInputColumn();
+                            if (foreach.isInputFlattened(foreachInputColumn.second))
+                                return false;
+                        }
+                    }
+                }
+                
                 mInsertBetween = true;
                 return true;
             }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/DerivedDataVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/DerivedDataVisitor.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/DerivedDataVisitor.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/DerivedDataVisitor.java Fri Jan  8 18:17:07 2010
@@ -35,7 +35,6 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
-import org.apache.pig.backend.local.executionengine.physicalLayer.LocalLogToPhyTranslationVisitor;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/ExampleGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/ExampleGenerator.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/ExampleGenerator.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/ExampleGenerator.java Fri Jan  8 18:17:07 2010
@@ -31,7 +31,6 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.local.executionengine.physicalLayer.LocalLogToPhyTranslationVisitor;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/LocalLogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/LocalLogToPhyTranslationVisitor.java?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/LocalLogToPhyTranslationVisitor.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/LocalLogToPhyTranslationVisitor.java Fri Jan  8 18:17:07 2010
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.pen.physicalOperators.POCounter;
+import org.apache.pig.pen.physicalOperators.POCogroup;
+import org.apache.pig.pen.physicalOperators.POCross;
+import org.apache.pig.pen.physicalOperators.POSplit;
+import org.apache.pig.pen.physicalOperators.POSplitOutput;
+import org.apache.pig.pen.physicalOperators.POStreamLocal;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOCross;
+import org.apache.pig.impl.logicalLayer.LOJoin;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.impl.logicalLayer.LOStream;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.DependencyOrderWalkerWOSeenChk;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+
+public class LocalLogToPhyTranslationVisitor extends LogToPhyTranslationVisitor {
+
+    private Log log = LogFactory.getLog(getClass());
+    
+    public LocalLogToPhyTranslationVisitor(LogicalPlan plan) {
+	super(plan);
+	// TODO Auto-generated constructor stub
+    }
+    
+    public Map<LogicalOperator, PhysicalOperator> getLogToPhyMap() {
+	return logToPhyMap;
+    }
+    
+    @Override
+    public void visit(LOCogroup cg) throws VisitorException {
+	String scope = cg.getOperatorKey().scope;
+        List<LogicalOperator> inputs = cg.getInputs();
+        
+        POCogroup poc = new POCogroup(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
+        poc.setInner(cg.getInner());
+        currentPlan.add(poc);
+        
+        int count = 0;
+        Byte type = null;
+        for(LogicalOperator lo : inputs) {
+            List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans().get(lo);
+            
+            POLocalRearrangeForIllustrate physOp = new POLocalRearrangeForIllustrate(new OperatorKey(
+                    scope, nodeGen.getNextNodeId(scope)), cg
+                    .getRequestedParallelism());
+            List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+            currentPlans.push(currentPlan);
+            for (LogicalPlan lp : plans) {
+                currentPlan = new PhysicalPlan();
+                PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+                        .spawnChildWalker(lp);
+                pushWalker(childWalker);
+                mCurrentWalker.walk(this);
+                exprPlans.add(currentPlan);
+                popWalker();
+
+            }
+            currentPlan = currentPlans.pop();
+            try {
+                physOp.setPlans(exprPlans);
+            } catch (PlanException pe) {
+                throw new VisitorException(pe);
+            }
+            try {
+                physOp.setIndex(count++);
+            } catch (ExecException e1) {
+                throw new VisitorException(e1);
+            }
+            if (plans.size() > 1) {
+                type = DataType.TUPLE;
+                physOp.setKeyType(type);
+            } else {
+                type = exprPlans.get(0).getLeaves().get(0).getResultType();
+                physOp.setKeyType(type);
+            }
+            physOp.setResultType(DataType.TUPLE);
+
+            currentPlan.add(physOp);
+
+            try {
+                currentPlan.connect(logToPhyMap.get(lo), physOp);
+                currentPlan.connect(physOp, poc);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
+                throw new VisitorException(e);
+            }
+            
+        }
+        logToPhyMap.put(cg, poc);
+    }
+    
+    @Override
+    public void visit(LOJoin join) throws VisitorException {
+        String scope = join.getOperatorKey().scope;
+        List<LogicalOperator> inputs = join.getInputs();
+        boolean[] innerFlags = join.getInnerFlags();
+
+        // In local mode, LOJoin is achieved by POCogroup followed by a POForEach with flatten
+        // Insert a POCogroup in the place of LOJoin
+        POCogroup poc = new POCogroup(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), join.getRequestedParallelism());
+        poc.setInner(innerFlags);
+        
+        currentPlan.add(poc);
+        
+        // Add innner plans to POCogroup
+        int count = 0;
+        Byte type = null;
+        for(LogicalOperator lo : inputs) {
+            List<LogicalPlan> plans = (List<LogicalPlan>) join.getJoinPlans().get(lo);
+            
+            POLocalRearrangeForIllustrate physOp = new POLocalRearrangeForIllustrate(new OperatorKey(
+                    scope, nodeGen.getNextNodeId(scope)), join
+                    .getRequestedParallelism());
+            List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+            currentPlans.push(currentPlan);
+            for (LogicalPlan lp : plans) {
+                currentPlan = new PhysicalPlan();
+                PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+                        .spawnChildWalker(lp);
+                pushWalker(childWalker);
+                mCurrentWalker.walk(this);
+                exprPlans.add(currentPlan);
+                popWalker();
+
+            }
+            currentPlan = currentPlans.pop();
+            try {
+                physOp.setPlans(exprPlans);
+            } catch (PlanException pe) {
+                throw new VisitorException(pe);
+            }
+            try {
+                physOp.setIndex(count++);
+            } catch (ExecException e1) {
+                throw new VisitorException(e1);
+            }
+            if (plans.size() > 1) {
+                type = DataType.TUPLE;
+                physOp.setKeyType(type);
+            } else {
+                type = exprPlans.get(0).getLeaves().get(0).getResultType();
+                physOp.setKeyType(type);
+            }
+            physOp.setResultType(DataType.TUPLE);
+
+            currentPlan.add(physOp);
+
+            try {
+                currentPlan.connect(logToPhyMap.get(lo), physOp);
+                currentPlan.connect(physOp, poc);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
+                throw new VisitorException(e);
+            }
+            
+        }
+        
+        // Append POForEach after POCogroup
+        List<Boolean> flattened = new ArrayList<Boolean>();
+        List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+        
+        for (int i=0;i<join.getInputs().size();i++)
+        {
+            PhysicalPlan ep = new PhysicalPlan();
+            POProject prj = new POProject(new OperatorKey(scope,nodeGen.getNextNodeId(scope)));
+            prj.setResultType(DataType.BAG);
+            prj.setColumn(i+1);
+            prj.setOverloaded(false);
+            prj.setStar(false);
+            ep.add(prj);
+            eps.add(ep);
+            // the parser would have marked the side
+            // where we need to keep empty bags on
+            // non matched as outer (innerFlags[i] would be
+            // false)
+            if(!(innerFlags[i])) {
+                LogicalOperator joinInput = inputs.get(i);
+                // for outer join add a bincond
+                // which will project nulls when bag is
+                // empty
+                try {
+                    updateWithEmptyBagCheck(ep, joinInput);
+                } catch (PlanException e) {
+                    throw new VisitorException(e);
+                }
+            }
+            flattened.add(true);
+        }
+        
+        POForEach fe = new POForEach(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),-1,eps,flattened);
+        
+        fe.setResultType(DataType.BAG);
+
+        currentPlan.add(fe);
+        logToPhyMap.put(join, fe);
+        try {
+            currentPlan.connect(poc, fe);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+    }
+    
+    @Override
+    public void visit(LOSplit split) throws VisitorException {
+	String scope = split.getOperatorKey().scope;
+        PhysicalOperator physOp = new POSplit(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), split.getRequestedParallelism());
+        
+        logToPhyMap.put(split, physOp);
+
+        currentPlan.add(physOp);
+        PhysicalOperator from = logToPhyMap.get(split.getPlan()
+                .getPredecessors(split).get(0));
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + e.getMessage());
+            throw new VisitorException(e);
+        }
+    }
+    
+    @Override
+    public void visit(LOSplitOutput split) throws VisitorException {
+	String scope = split.getOperatorKey().scope;
+        PhysicalOperator physOp = new POSplitOutput(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), split.getRequestedParallelism());
+        logToPhyMap.put(split, physOp);
+
+        currentPlan.add(physOp);
+        currentPlans.push(currentPlan);
+        currentPlan = new PhysicalPlan();
+        PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+                .spawnChildWalker(split.getConditionPlan());
+        pushWalker(childWalker);
+        mCurrentWalker.walk(this);
+        popWalker();
+
+        ((POSplitOutput) physOp).setPlan(currentPlan);
+        currentPlan = currentPlans.pop();
+        currentPlan.add(physOp);
+        PhysicalOperator from = logToPhyMap.get(split.getPlan()
+                .getPredecessors(split).get(0));
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            log.error("Invalid physical operator in the plan" + e.getMessage());
+            throw new VisitorException(e);
+        }
+    }
+    
+    @Override
+    public void visit(LOStream stream) throws VisitorException {
+        String scope = stream.getOperatorKey().scope;
+        POStreamLocal poStream = new POStreamLocal(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), stream.getExecutableManager(), 
+                stream.getStreamingCommand(), pc.getProperties());
+        currentPlan.add(poStream);
+        logToPhyMap.put(stream, poStream);
+        
+        List<LogicalOperator> op = stream.getPlan().getPredecessors(stream);
+
+        PhysicalOperator from = logToPhyMap.get(op.get(0));
+        try {
+            currentPlan.connect(from, poStream);
+        } catch (PlanException e) {
+            log.error("Invalid physical operators in the physical plan"
+                    + e.getMessage());
+            throw new VisitorException(e);
+        }
+    }
+    
+    @Override
+    public void visit(LOCross cross) throws VisitorException {
+        String scope = cross.getOperatorKey().scope;
+        
+        POCross pocross = new POCross(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+        logToPhyMap.put(cross, pocross);
+        currentPlan.add(pocross);
+        
+        
+        for(LogicalOperator in : cross.getInputs()) {
+            PhysicalOperator from = logToPhyMap.get(in);
+            try {
+                currentPlan.connect(from, pocross);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
+                throw new VisitorException(e);
+            }
+        }
+        //currentPlan.explain(System.out);
+    }
+    
+    @Override
+    public void visit(LOStore loStore) throws VisitorException {
+        String scope = loStore.getOperatorKey().scope;
+        POStore store = new POStore(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)));
+        store.setSFile(loStore.getOutputFile());
+        store.setInputSpec(loStore.getInputSpec());
+        try {
+            // create a new schema for ourselves so that when
+            // we serialize we are not serializing objects that
+            // contain the schema - apparently Java tries to
+            // serialize the object containing the schema if
+            // we are trying to serialize the schema reference in
+            // the containing object. The schema here will be serialized
+            // in JobControlCompiler
+            store.setSchema(new Schema(loStore.getSchema()));
+        } catch (FrontendException e1) {
+            int errorCode = 1060;
+            String message = "Cannot resolve Store output schema";  
+            throw new VisitorException(message, errorCode, PigException.BUG, e1);    
+        }
+        //store.setPc(pc);
+        currentPlan.add(store);
+        PhysicalOperator from = logToPhyMap.get(loStore
+                .getPlan().getPredecessors(loStore).get(0));
+        
+        POCounter counter = new POCounter(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+        currentPlan.add(counter);
+        try {
+            currentPlan.connect(from, counter);
+            currentPlan.connect(counter, store);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+        logToPhyMap.put(loStore, store);
+        
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POCogroup.java?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POCogroup.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POCogroup.java Fri Jan  8 18:17:07 2010
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen.physicalOperators;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.SortedDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+
+/** This is a local implementation of Cogroup.
+ * The inputs need to be connected to LocalRearranges possibly by the
+ * logical to physical translator.
+ * 
+ * This is a blocking operator. The outputs of LRs are put into
+ * SortedDataBags. They are sorted on the keys. We then start pulling
+ * tuple out of these bags and start constructing output.
+ * 
+ *
+ */
+
+//We intentionally skip type checking in backend for performance reasons
+@SuppressWarnings("unchecked")
+public class POCogroup extends PhysicalOperator {
+    private static final long serialVersionUID = 1L;    
+    Tuple[] data = null;
+    transient Iterator<Tuple>[] its = null;
+    boolean[] inner;
+
+    public POCogroup(OperatorKey k) {
+	super(k);
+	// TODO Auto-generated constructor stub
+    }
+
+    public POCogroup(OperatorKey k, int rp) {
+	super(k, rp);
+	// TODO Auto-generated constructor stub
+    }
+
+    public POCogroup(OperatorKey k, List<PhysicalOperator> inp) {
+	super(k, inp);
+    }
+
+    public POCogroup(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+	super(k, rp, inp);
+    }
+    
+    public void setInner(boolean[] inner) {
+        this.inner = inner;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+	// TODO Auto-generated method stub
+	v.visitCogroup(this);
+
+    }
+
+    @Override
+    public String name() {
+	// TODO Auto-generated method stub
+	return "POCogroup" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
+    }
+    
+    @Override
+    public Result getNext(Tuple t) throws ExecException{
+	if(its == null) {
+	    accumulateData();
+	}
+	
+	boolean done = true;
+	Result res = new Result();
+	for(int i = 0; i < data.length; i++) {
+	    done &= (data[i] == null);
+	}
+	if(done) {
+	    res.returnStatus = POStatus.STATUS_EOP;
+	    its = null;
+	    return res;
+	}
+	
+	Tuple smallestTuple = getSmallest(data);
+	Comparator<Tuple> comp = new groupComparator();
+	
+	int size = data.length;
+	
+	Tuple output = TupleFactory.getInstance().newTuple(size + 1);
+	
+	output.set(0, smallestTuple.get(1));
+	for(int i = 1; i < size + 1; i++) {
+	    output.set(i, BagFactory.getInstance().newDefaultBag());
+	}
+	ExampleTuple tOut = null;
+	if(lineageTracer != null) {
+	    tOut = new ExampleTuple(output);
+	    lineageTracer.insert(tOut);
+	}
+	
+	boolean loop = true;
+	
+	while(loop) {
+	    loop = false;
+	    for(int i = 0; i < size; i++) {
+		if(data[i] != null && comp.compare(data[i], smallestTuple) == 0) {
+		    loop = true;
+		    DataBag bag = (DataBag) output.get(i + 1);
+		    //update lineage if it exists
+		    //Tuple temp = ((IndexedTuple) data[i].get(1)).toTuple();
+		    Tuple temp = (Tuple) data[i].get(2);
+		    if(lineageTracer != null) {
+			if(((ExampleTuple)temp).synthetic) tOut.synthetic = true;
+			lineageTracer.union(temp, tOut);
+		    }
+		    //bag.add(((IndexedTuple) data[i].get(1)).toTuple());
+		    bag.add(temp);
+		    if(its[i].hasNext()) 
+			data[i] = its[i].next();
+		    else
+			data[i] = null;
+			
+		    
+		}
+	    }
+	}
+	if(lineageTracer != null)
+	    res.result = tOut;
+	else
+	    res.result = output;
+	
+	res.returnStatus = POStatus.STATUS_OK;
+//    System.out.println(output);
+	for(int i = 0; i < size; i++) {
+	    if(inner != null && inner[i] && ((DataBag)output.get(i+1)).size() == 0) {
+	        res.returnStatus = POStatus.STATUS_NULL;
+	        break;
+	    }
+	}
+	
+	
+	return res;
+    }
+    
+    private void accumulateData() throws ExecException {
+	int size = inputs.size();
+	its = new Iterator[size];
+	data = new Tuple[size];
+	for(int i = 0; i < size; i++) {
+	    DataBag bag = new SortedDataBag(new groupComparator());
+	    for(Result input = inputs.get(i).getNext(dummyTuple); input.returnStatus != POStatus.STATUS_EOP; input = inputs.get(i).getNext(dummyTuple)) {
+	        if(input.returnStatus == POStatus.STATUS_ERR) {
+	            throw new ExecException("Error accumulating output at local Cogroup operator");
+	        }
+	        if(input.returnStatus == POStatus.STATUS_NULL)
+	            continue;
+	        bag.add((Tuple) input.result);
+	    }
+	    
+	    its[i] = bag.iterator();
+	    data[i] = its[i].next();
+	}
+
+    }
+    
+//    private Tuple getSmallest(Tuple[] data) {
+//	Tuple t = (Tuple) data[0];
+//	Comparator<Tuple> comp = new groupComparator();
+//	for(int i = 1; i < data.length; i++) {
+//	    if(comp.compare(t, (Tuple) data[i]) < 0) 
+//		t = data[i];
+//	}
+//	return t;
+//    }
+    
+    private Tuple getSmallest(Tuple[] data) {
+	Tuple t = null;
+	Comparator<Tuple> comp = new groupComparator();
+	
+	for(int i = 0; i < data.length; i++) {
+	    if(data[i] == null) continue;
+	    if(t == null) {
+		t = data[i];
+		continue; //since the previous data was probably null so we dont really need a comparison
+	    }
+	    if(comp.compare(t, data[i]) > 0) 
+		t = data[i];
+	}
+	return t;
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+	// TODO Auto-generated method stub
+	return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+	// TODO Auto-generated method stub
+	return false;
+    }
+    
+    private static class groupComparator implements Comparator<Tuple> {
+
+	public int compare(Tuple o1, Tuple o2) {
+	    //We want to make it as efficient as possible by only comparing the keys
+	    Object t1 = null;
+	    Object t2 = null;
+	    try {
+        // get the keys
+        t1 = o1.get(1);
+        t2 = o2.get(1);
+        if(t1 == t2 && t1 == null) {
+            // null keys from different inputs
+            // are not treated as equals
+            int firstInputIndex = (Byte)(o1.get(0));
+            int secondInputIndex = (Byte)(o2.get(0));
+            return firstInputIndex - secondInputIndex;
+        }
+	    } catch (ExecException e) {
+		// TODO Auto-generated catch block
+		throw new RuntimeException("Error comparing tuples");
+	    }
+	    
+	    int result = DataType.compare(t1, t2);
+	    
+	    // Further check if any field is null
+        // See PIG-927
+	    if (result == 0 && t1 instanceof Tuple && t2 instanceof Tuple)
+	    {
+	        try {
+    	        int firstInputIndex = (Byte)(o1.get(0));
+                int secondInputIndex = (Byte)(o2.get(0));
+    	        for (int i=0;i<((Tuple)t1).size();i++)
+                    if (((Tuple)t1).get(i)==null)
+                        return firstInputIndex - secondInputIndex;
+            } catch (ExecException e) {
+                throw new RuntimeException("Error comparing tuple fields", e);
+            }
+	    }
+	    return result;
+	}
+	
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POCounter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POCounter.java?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POCounter.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POCounter.java Fri Jan  8 18:17:07 2010
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen.physicalOperators;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POCounter extends PhysicalOperator {
+
+	static final long serialVersionUID = 1L;
+
+    private long count = 0;
+    
+    public POCounter(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+        // TODO Auto-generated constructor stub
+    }
+
+    public POCounter(OperatorKey k, int rp) {
+        super(k, rp);
+        // TODO Auto-generated constructor stub
+    }
+
+    public POCounter(OperatorKey k, List<PhysicalOperator> inp) {
+        super(k, inp);
+        // TODO Auto-generated constructor stub
+    }
+
+    public POCounter(OperatorKey k) {
+        super(k);
+        // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        // TODO Auto-generated method stub
+
+    }
+
+    
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        // TODO Auto-generated method stub
+        return getNext(processInput());
+    }
+    
+    private Result getNext(Result res) {
+        //System.out.println("Status = " + res.returnStatus);
+        if(res.returnStatus == POStatus.STATUS_OK) {
+            //System.out.println("Incrementing counter");
+            count++;
+        }
+        return res;
+    }
+
+    @Override
+    public String name() {
+        // TODO Auto-generated method stub
+        return "POCounter - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+    
+    public long getCount() {
+        return count;
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POCross.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POCross.java?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POCross.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POCross.java Fri Jan  8 18:17:07 2010
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen.physicalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+
+/**
+ * This is a local implementation of the cross. Its a blocking operator.
+ * It accumulates inputs into databags and then applies logic similar to 
+ * foreach flatten(*) to get the output tuples
+ * 
+ *
+ */
+public class POCross extends PhysicalOperator {
+    private static final long serialVersionUID = 1L;    
+    DataBag [] inputBags;
+    Tuple [] data;
+    transient Iterator [] its;
+
+    public POCross(OperatorKey k) {
+        super(k);
+        // TODO Auto-generated constructor stub
+    }
+
+    public POCross(OperatorKey k, int rp) {
+        super(k, rp);
+        // TODO Auto-generated constructor stub
+    }
+
+    public POCross(OperatorKey k, List<PhysicalOperator> inp) {
+        super(k, inp);
+        // TODO Auto-generated constructor stub
+    }
+
+    public POCross(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+        // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        // TODO Auto-generated method stub
+        v.visitCross(this);
+
+    }
+    
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        Result res = new Result();
+        int noItems = inputs.size();
+        if(inputBags == null) {
+            accumulateData();
+        }
+        
+        if(its != null) {
+            //we check if we are done with processing
+            //we do that by checking if all the iterators are used up
+            boolean finished = true;
+            for(int i = 0; i < its.length; i++) {
+                finished &= !its[i].hasNext();
+            }
+            if(finished) {
+                res.returnStatus = POStatus.STATUS_EOP;
+                return res;
+            }
+            
+        }
+        
+        if(data == null) {
+            //getNext being called for the first time or starting on new input data
+            //we instantiate the template array and start populating it with data
+            data = new Tuple[noItems];
+            for(int i = 0; i < noItems; ++i) {
+                data[i] = (Tuple) its[i].next();
+
+            }
+            res.result = CreateTuple(data);
+            res.returnStatus = POStatus.STATUS_OK;
+            return res;
+        } else {
+            for(int index = noItems - 1; index >= 0; --index) {
+                if(its[index].hasNext()) {
+                    data[index] =  (Tuple) its[index].next();
+                    res.result = CreateTuple(data);
+                    res.returnStatus = POStatus.STATUS_OK;
+                    return res;
+                }
+                else{
+                    // reset this index's iterator so cross product can be achieved
+                    // we would be resetting this way only for the indexes from the end
+                    // when the first index which needs to be flattened has reached the
+                    // last element in its iterator, we won't come here - instead, we reset
+                    // all iterators at the beginning of this method.
+                    its[index] = (inputBags[index]).iterator();
+                    data[index] = (Tuple) its[index].next();
+                }
+
+            }
+        }
+        
+        return null;
+    }
+    
+    private void accumulateData() throws ExecException {
+        int count = 0;
+        inputBags = new DataBag[inputs.size()];
+        
+        its = new Iterator[inputs.size()];
+        for(PhysicalOperator op : inputs) {
+            DataBag bag = BagFactory.getInstance().newDefaultBag();
+            inputBags[count] = bag;
+            for(Result res = op.getNext(dummyTuple); res.returnStatus != POStatus.STATUS_EOP; res = op.getNext(dummyTuple)) {
+                if(res.returnStatus == POStatus.STATUS_NULL)
+                    continue;
+                if(res.returnStatus == POStatus.STATUS_ERR)
+                    throw new ExecException("Error accumulating data in the local Cross operator");
+                if(res.returnStatus == POStatus.STATUS_OK)
+                    bag.add((Tuple) res.result);
+            }
+            its[count++] = bag.iterator();
+        }
+    }
+    
+    private Tuple CreateTuple(Tuple[] data) throws ExecException {
+        Tuple out =  TupleFactory.getInstance().newTuple();
+        
+        for(int i = 0; i < data.length; ++i) {
+            Tuple t = data[i];
+            int size = t.size();
+            for(int j = 0; j < size; ++j) {
+                out.append(t.get(j));
+            }
+
+        }
+        
+        if(lineageTracer != null) {
+            ExampleTuple tOut = new ExampleTuple();
+            tOut.reference(out);
+            lineageTracer.insert(tOut);
+            for(int i = 0; i < data.length; i++) {
+                lineageTracer.union(tOut, data[i]);
+            }
+            return tOut;
+        }
+        return out;
+    }
+
+    @Override
+    public String name() {
+        // TODO Auto-generated method stub
+        return "POCrossLocal" + " - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        // TODO Auto-generated method stub
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplit.java?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplit.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplit.java Fri Jan  8 18:17:07 2010
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen.physicalOperators;
+
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POSplit extends PhysicalOperator {
+    
+    
+    /**
+     * POSplit is a blocking operator. It reads the data from its input into a databag and then returns the iterator
+     * of that bag to POSplitOutputs which do the necessary filtering
+     */
+    private static final long serialVersionUID = 1L;
+
+    DataBag data = null;
+    
+    boolean processingDone = false;
+
+    public POSplit(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+	super(k, rp, inp);
+	// TODO Auto-generated constructor stub
+	data = BagFactory.getInstance().newDefaultBag();
+    }
+
+    public POSplit(OperatorKey k, int rp) {
+	this(k, rp, null);
+    }
+
+    public POSplit(OperatorKey k, List<PhysicalOperator> inp) {
+	
+	this(k, -1, inp);
+    }
+
+    public POSplit(OperatorKey k) {
+	 this(k, -1, null);
+    }
+
+    public Result getNext(Tuple t) throws ExecException{
+	if(!processingDone) {
+	    for(Result input = inputs.get(0).getNext(dummyTuple); input.returnStatus != POStatus.STATUS_EOP; input = inputs.get(0).getNext(dummyTuple)) {
+		if(input.returnStatus == POStatus.STATUS_ERR) {
+		    throw new ExecException("Error accumulating output at local Split operator");
+		}
+
+                if (input.returnStatus != POStatus.STATUS_NULL) {
+                    data.add((Tuple) input.result);
+                }
+	    }
+	    processingDone = true;
+	}
+
+	Result res = new Result();
+	res.returnStatus = POStatus.STATUS_OK;
+	res.result = data.iterator();
+	return res;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+	v.visitSplit(this);
+    }
+
+    @Override
+    public String name() {
+	return "Split - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+	// TODO Auto-generated method stub
+	return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+	// TODO Auto-generated method stub
+	return true;
+    }
+
+}



Mime
View raw message