pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From o...@apache.org
Subject svn commit: r771437 [1/2] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengin...
Date Mon, 04 May 2009 20:50:32 GMT
Author: olga
Date: Mon May  4 20:50:31 2009
New Revision: 771437

URL: http://svn.apache.org/viewvc?rev=771437&view=rev
Log:
multiquery support, phase 3, final checkin

Added:
    hadoop/pig/trunk/src/org/apache/pig/StoreConfig.java
      - copied unchanged from r770826, hadoop/pig/branches/multiquery/src/org/apache/pig/StoreConfig.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
      - copied unchanged from r770826, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
      - copied unchanged from r770826, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/
      - copied from r770826, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/util/
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
      - copied unchanged from r770826, hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
Modified:
    hadoop/pig/trunk/   (props changed)
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/PigDump.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLocal.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
    hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java

Propchange: hadoop/pig/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon May  4 20:50:31 2009
@@ -1 +1 @@
-/hadoop/pig/branches/multiquery:741727-767731
+/hadoop/pig/branches/multiquery:741727-770826

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon May  4 20:50:31 2009
@@ -606,3 +606,7 @@
     PIG-284: target for building source jar (oae via olgan)
 
     PIG-627: multiquery support phase 1 and phase 2 (hagleitn and Richard Ding via pradeepkth)
+
+    PIG-652: Adapt changes in store interface to multi-query changes (hagleitn via gates).
+
+    PIG-627: multiquery support phase 3 (hagleitn and Richard Ding via olgan)

Modified: hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/StoreFunc.java Mon May  4 20:50:31 2009
@@ -58,7 +58,24 @@
      * 
      * @throws IOException
      */
-    public abstract void finish() throws IOException;
+    public abstract void finish() throws IOException;
+    
+    /**
+     * Specify a backend specific class to use to prepare for
+     * storing output.  In the Hadoop case, this can return an
+     * OutputFormat that will be used instead of PigOutputFormat.  The 
+     * framework will call this function and if a Class is returned
+     * that implements OutputFormat it will be used. For more details on how
+     * the OutputFormat should interact with Pig, see 
+     * {@link org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat#getRecordWriter(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.mapred.JobConf, String, org.apache.hadoop.util.Progressable)}
+     * @return Backend specific class used to prepare for storing output.
+     * If the {@link StoreFunc} implementation does not have a class to prepare
+     * for storing output, it can return null and a default Pig implementation
+     * will be used to prepare for storing output.
+     * @throws IOException if the class does not implement the expected
+     * interface(s).
+     */
+    public Class getStorePreparationClass() throws IOException;
 
     
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon May  4 20:50:31 2009
@@ -36,12 +36,15 @@
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
+import org.apache.pig.StoreConfig;
+import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
@@ -100,6 +103,8 @@
     PigContext pigContext;
     
     private final Log log = LogFactory.getLog(getClass());
+    
+    public static final String PIG_STORE_CONFIG = "pig.store.config";
 
     public static final String LOG_DIR = "_logs";
 
@@ -310,7 +315,6 @@
                                   "pig.streaming.cache.files", false);
 
             jobConf.setInputFormat(PigInputFormat.class);
-            jobConf.setOutputFormat(PigOutputFormat.class);
             
             //Process POStore and remove it from the plan
             List<POStore> mapStores = PlanHelper.getStores(mro.mapPlan);
@@ -328,11 +332,37 @@
                     st = reduceStores.remove(0);
                     mro.reducePlan.remove(st);
                 }
+
+                // If the StoreFunc associate with the POStore is implements
+                // getStorePreparationClass() and returns a non null value,
+                // then it could be wanting to implement OutputFormat for writing out to hadoop
+                // Check if this is the case, if so, use the OutputFormat class the 
+                // StoreFunc gives us else use our default PigOutputFormat
+                Object storeFunc = PigContext.instantiateFuncFromSpec(st.getSFile().getFuncSpec());
+                Class sPrepClass = null;
+                try {
+                    sPrepClass = ((StoreFunc)storeFunc).getStorePreparationClass();
+                } catch(AbstractMethodError e) {
+                    // this is for backward compatibility wherein some old StoreFunc
+                    // which does not implement getStorePreparationClass() is being
+                    // used. In this case, we want to just use PigOutputFormat
+                    sPrepClass = null;
+                }
+                if(sPrepClass != null && OutputFormat.class.isAssignableFrom(sPrepClass)) {
+                    jobConf.setOutputFormat(sPrepClass);
+                } else {
+                    jobConf.setOutputFormat(PigOutputFormat.class);
+                }
+                
+                //set out filespecs
                 String outputPath = st.getSFile().getFileName();
                 FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
                 FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
+             
                 jobConf.set("pig.storeFunc", outputFuncSpec.toString());
-                
+                jobConf.set(PIG_STORE_CONFIG, 
+                            ObjectSerializer.serialize(new StoreConfig(outputPath, st.getSchema())));
+
                 jobConf.set("pig.streaming.log.dir", 
                             new Path(outputPath, LOG_DIR).toString());
                 jobConf.set("pig.streaming.task.output.dir", outputPath);
@@ -349,6 +379,7 @@
                     fs.mkdirs(tmpOut);
                 }
 
+                jobConf.setOutputFormat(PigOutputFormat.class);
                 FileOutputFormat.setOutputPath(jobConf, curTmpPath);
 
                 jobConf.set("pig.streaming.log.dir", 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Mon May  4 20:50:31 2009
@@ -238,6 +238,8 @@
         NoopFilterRemover fRem = new NoopFilterRemover(plan);
         fRem.visit();
         
+        // reduces the number of MROpers in the MR plan generated 
+        // by multi-query (multi-store) script.
         MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan);
         mqOptimizer.visit();
 
@@ -246,7 +248,7 @@
         // NoopFilterRemover.
         NoopStoreRemover sRem = new NoopStoreRemover(plan);
         sRem.visit();
-
+      
         // check whether stream operator is present
         // after MultiQueryOptimizer because it can shift streams from
         // map to reduce, etc.

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Mon May  4 20:50:31 2009
@@ -36,10 +36,13 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
+import org.apache.pig.StoreConfig;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.ObjectSerializer;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
@@ -65,6 +68,7 @@
     private JobConf job;
 
     private final Log log = LogFactory.getLog(getClass());
+    public static final String PIG_STORE_CONFIG = "pig.store.config";
     
     public MapReducePOStoreImpl(JobConf job) {
         this.job = job;
@@ -75,14 +79,33 @@
     }
 
     @Override
-    public StoreFunc createStoreFunc(FileSpec sFile) throws IOException {
+    public StoreFunc createStoreFunc(FileSpec sFile, Schema schema) 
+        throws IOException {
 
         // set up a new job conf
         JobConf outputConf = new JobConf(job);
         String tmpPath = PlanHelper.makeStoreTmpPath(sFile.getFileName());
 
-        // Right now we're always using PigOutputFormat.
-        outputConf.setOutputFormat(PigOutputFormat.class);
+        // If the StoreFunc associate with the POStore is implements
+        // getStorePreparationClass() and returns a non null value,
+        // then it could be wanting to implement OutputFormat for writing out to hadoop
+        // Check if this is the case, if so, use the OutputFormat class the 
+        // StoreFunc gives us else use our default PigOutputFormat
+        Object storeFunc = PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
+        Class sPrepClass = null;
+        try {
+            sPrepClass = ((StoreFunc)storeFunc).getStorePreparationClass();
+        } catch(AbstractMethodError e) {
+            // this is for backward compatibility wherein some old StoreFunc
+            // which does not implement getStorePreparationClass() is being
+            // used. In this case, we want to just use PigOutputFormat
+            sPrepClass = null;
+        }
+        if(sPrepClass != null && OutputFormat.class.isAssignableFrom(sPrepClass)) {
+            outputConf.setOutputFormat(sPrepClass);
+        } else {
+            outputConf.setOutputFormat(PigOutputFormat.class);
+        }
 
         // PigOuputFormat will look for pig.storeFunc to actually
         // write stuff out.
@@ -94,6 +117,10 @@
         Path outputDir = new Path(sFile.getFileName()).makeQualified(FileSystem.get(outputConf));
         outputConf.set("mapred.output.dir", outputDir.toString());
 
+        // Set the schema
+        outputConf.set(PIG_STORE_CONFIG, 
+                       ObjectSerializer.serialize(new StoreConfig(outputDir.toString(), schema)));
+
         // The workpath is set to a unique-per-store subdirectory of
         // the current working directory.
         String workPath = outputConf.get("mapred.work.output.dir");
@@ -168,5 +195,10 @@
         @Override
         public void finish() throws IOException {
         }
+
+        @Override
+        public Class getStorePreparationClass() throws IOException {
+            return null;
+        }
     }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Mon May  4 20:50:31 2009
@@ -19,13 +19,19 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.data.DataType;
@@ -34,24 +40,28 @@
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
 import org.apache.pig.PigException;
 
+
 /** 
  * An optimizer that merges all or part splittee MapReduceOpers into 
- * splitter MapReduceOper. The merge can produce a MROperPlan that has 
+ * splitter MapReduceOper. 
+ * <p>
+ * The merge can produce a MROperPlan that has 
  * fewer MapReduceOpers than MapReduceOpers in the original MROperPlan.
- * 
+ * <p>
  * The MRCompler generates multiple MapReduceOpers whenever it encounters 
  * a split operator and connects the single splitter MapReduceOper to 
  * one or more splittee MapReduceOpers using store/load operators:  
- *  
+ * <p>
  *     ---- POStore (in splitter) -... ----
  *     |        |    ...    |
  *     |        |    ...    |
  *    POLoad  POLoad ...  POLoad (in splittees)
  *      |        |           |
- *      
+ * <p>   
  *  This optimizer merges those MapReduceOpers by replacing POLoad/POStore 
  *  combination with POSplit operator.    
  */
@@ -68,15 +78,24 @@
         nig = NodeIdGenerator.getGenerator();
         List<MapReduceOper> roots = plan.getRoots();
         scope = roots.get(0).getOperatorKey().getScope();
+        
+        log.info("MR plan size before optimization: " + plan.size());
     }
 
     @Override
+    public void visit() throws VisitorException {
+        super.visit();
+        
+        log.info("MR plan size after optimization: " + mPlan.size());
+    }
+    
+    @Override
     public void visitMROp(MapReduceOper mr) throws VisitorException {
-       
+        
         if (!mr.isSplitter()) {
             return;
-        }
-       
+        }       
+        
         // first classify all the splittees
         List<MapReduceOper> mappers = new ArrayList<MapReduceOper>();
         List<MapReduceOper> multiLoadMROpers = new ArrayList<MapReduceOper>();
@@ -86,23 +105,26 @@
         for (MapReduceOper successor : successors) {
             if (isMapOnly(successor)) {
                 if (isSingleLoadMapperPlan(successor.mapPlan)) {                    
-                    mappers.add(successor);
+                    mappers.add(successor);                
                 } else {                    
                     multiLoadMROpers.add(successor);
                 }
             } else {
                 if (isSingleLoadMapperPlan(successor.mapPlan)) {                     
-                    mapReducers.add(successor);
+                    mapReducers.add(successor);                  
                 } else {                    
-                    multiLoadMROpers.add(successor);
+                    multiLoadMROpers.add(successor);                      
                 }
             }                
         }
-              
+                      
         // case 1: exactly one splittee and it's map-only
         if (mappers.size() == 1 && mapReducers.size() == 0 
                 && multiLoadMROpers.size() == 0 ) {            
             mergeOnlyMapperSplittee(mappers.get(0), mr);
+            
+            log.info("Merged the only map-only splittee.");
+            
             return;              
         }
         
@@ -110,9 +132,15 @@
         if (isMapOnly(mr) && mapReducers.size() == 1 
                 && mappers.size() == 0 && multiLoadMROpers.size() == 0) {            
             mergeOnlyMapReduceSplittee(mapReducers.get(0), mr);
+            
+            log.info("Merged the only map-reduce splittee.");
+            
             return;
         } 
         
+        int numSplittees = successors.size();
+        int numMerges = 0;
+        
         PhysicalPlan splitterPl = isMapOnly(mr) ? mr.mapPlan : mr.reducePlan;                            
         POStore storeOp = (POStore)splitterPl.getLeaves().get(0); 
         
@@ -120,38 +148,32 @@
         
         // case 3: multiple splittees and at least one of them is map-only
         if (mappers.size() > 0) {                
-            splitOp = getSplit();                
-            mergeAllMapOnlySplittees(mappers, mr, splitOp);
+            splitOp = getSplit();  
+            int n = mergeAllMapOnlySplittees(mappers, mr, splitOp);            
+            
+            log.info("Merged " + n + " map-only splittees.");
+            
+            numMerges += n;   
         }            
-               
-        boolean splitterMapOnly = isMapOnly(mr);
-        
-        // case 4: multiple splittees and at least one of them has reducer  
-        if (splitterMapOnly && mapReducers.size() > 0) {
-           
-            // pick one to merge, prefer one that has a combiner
-            MapReduceOper mapReducer= mapReducers.get(0);
-            for (MapReduceOper mro : mapReducers) {
-                if (!mro.combinePlan.isEmpty()) {
-                    mapReducer = mro;
-                    break;
-                }
-            }
               
+        // case 4: multiple splittees and at least one of them has reducer  
+        if (isMapOnly(mr) && mapReducers.size() > 0) {
+                         
             PhysicalOperator leaf = splitterPl.getLeaves().get(0);
                                                             
-            splitOp = (leaf instanceof POStore) ? 
-                    getSplit() : (POSplit)leaf;
+            splitOp = (leaf instanceof POStore) ? getSplit() : (POSplit)leaf;
                     
-            mergeSingleMapReduceSplittee(mapReducer, mr, splitOp);                
+            int n = mergeMapReduceSplittees(mapReducers, mr, splitOp);  
+            
+            log.info("Merged " + n + " map-reduce splittees.");
+            
+            numMerges += n;      
         }
-        
+       
         // finally, add original store to the split operator 
         // if there is splittee that hasn't been merged
         if (splitOp != null 
-                && ((multiLoadMROpers.size() > 0)
-                        || (mapReducers.size() > 1) 
-                        || (!splitterMapOnly && mapReducers.size() > 0))) {
+                && (numMerges < numSplittees)) {
 
             PhysicalPlan storePlan = new PhysicalPlan();
             try {
@@ -161,13 +183,17 @@
                 int errCode = 2129;
                 String msg = "Internal Error. Unable to add store to the split plan for optimization.";
                 throw new OptimizerException(msg, errCode, PigException.BUG, e);
-            }                               
+            }    
         }
+        
+        log.info("Merged " + numMerges + " out of total " 
+                + numSplittees + " splittees.");
     }                
-   
+    
     private void mergeOneMapPart(MapReduceOper mapper, MapReduceOper splitter)
     throws VisitorException {
-        PhysicalPlan splitterPl = isMapOnly(splitter) ? splitter.mapPlan : splitter.reducePlan;
+        PhysicalPlan splitterPl = isMapOnly(splitter) ? 
+                splitter.mapPlan : splitter.reducePlan;
         POStore storeOp = (POStore)splitterPl.getLeaves().get(0);
         List<PhysicalOperator> storePreds = splitterPl.getPredecessors(storeOp);           
         
@@ -203,14 +229,14 @@
         }
     }
 
-    private void mergeOnlyMapperSplittee(MapReduceOper mapper, MapReduceOper splitter) 
-    throws VisitorException {
+    private void mergeOnlyMapperSplittee(MapReduceOper mapper, 
+            MapReduceOper splitter) throws VisitorException {
         mergeOneMapPart(mapper, splitter);       
         removeAndReconnect(mapper, splitter);
     }
     
-    private void mergeOnlyMapReduceSplittee(MapReduceOper mapReducer, MapReduceOper splitter)
-    throws VisitorException {
+    private void mergeOnlyMapReduceSplittee(MapReduceOper mapReducer, 
+            MapReduceOper splitter) throws VisitorException {
         mergeOneMapPart(mapReducer, splitter);
 
         splitter.setMapDone(true);
@@ -220,7 +246,7 @@
         removeAndReconnect(mapReducer, splitter);          
     }
     
-    private void mergeAllMapOnlySplittees(List<MapReduceOper> mappers, 
+    private int mergeAllMapOnlySplittees(List<MapReduceOper> mappers, 
             MapReduceOper splitter, POSplit splitOp) throws VisitorException {
         
         PhysicalPlan splitterPl = isMapOnly(splitter) ? 
@@ -229,22 +255,16 @@
         List<PhysicalOperator> storePreds = splitterPl.getPredecessors(storeOp);  
 
         // merge splitee's map plans into nested plan of 
-        // the splitter operator
+        // the split operator
         for (MapReduceOper mapper : mappers) {                                
             PhysicalPlan pl = mapper.mapPlan;
             PhysicalOperator load = pl.getRoots().get(0);
-            pl.remove(load);
-            try {
-                splitOp.addPlan(pl);
-            } catch (PlanException e) {
-                int errCode = 2130;
-                String msg = "Internal Error. Unable to merge split plans for optimization.";
-                throw new OptimizerException(msg, errCode, PigException.BUG, e);
-            }
+            pl.remove(load);                   
+            splitOp.addPlan(pl);
         }
                            
         // replace store operator in the splitter with split operator
-        splitOp.setInputs(storePreds);
+        splitOp.setInputs(storePreds);    
         try {
             splitterPl.replace(storeOp, splitOp);;
         } catch (PlanException e) {
@@ -257,6 +277,406 @@
         for (MapReduceOper mapper : mappers) {
             removeAndReconnect(mapper, splitter);                  
         }
+        
+        return mappers.size();
+    }
+    
+    private boolean isSplitteeMergeable(MapReduceOper splittee) {
+        
+        // cannot be global sort or limit after sort, they are 
+        // using a different partitioner
+        if (splittee.isGlobalSort() || splittee.isLimitAfterSort()) {
+            log.info("Cannot merge this splittee: " +
+            		"it is global sort or limit after sort");
+            return false;
+        }
+        
+        // check the plan leaf: only merge local rearrange or split
+        PhysicalOperator leaf = splittee.mapPlan.getLeaves().get(0);
+        if (!(leaf instanceof POLocalRearrange) && 
+                ! (leaf instanceof POSplit)) {
+            log.info("Cannot merge this splittee: " +
+            		"its map plan doesn't end with LR or Split operator: " 
+                    + leaf.getClass().getName());
+            return false;
+        }
+           
+        // cannot have distinct combiner, it uses a different combiner
+        if (splittee.needsDistinctCombiner()) {
+            log.info("Cannot merge this splittee: " +
+            		"it has distinct combiner.");
+            return false;           
+        }
+        
+        return true;
+    }
+       
+    private List<MapReduceOper> getMergeList(List<MapReduceOper> mapReducers) {
+        List<MapReduceOper> mergeNoCmbList = new ArrayList<MapReduceOper>();
+        List<MapReduceOper> mergeCmbList = new ArrayList<MapReduceOper>();
+
+        for (MapReduceOper mrOp : mapReducers) {
+            if (isSplitteeMergeable(mrOp)) {
+                if (mrOp.combinePlan.isEmpty()) {
+                    mergeNoCmbList.add(mrOp);
+                } else {
+                    mergeCmbList.add(mrOp);
+                } 
+            }           
+        }     
+        return (mergeNoCmbList.size() > mergeCmbList.size()) ?
+                mergeNoCmbList : mergeCmbList;
+    }
+    
+    private int mergeMapReduceSplittees(List<MapReduceOper> mapReducers, 
+            MapReduceOper splitter, POSplit splitOp) throws VisitorException {
+                
+        List<MapReduceOper> mergeList = getMergeList(mapReducers);
+    
+        if (mergeList.size() <= 1) {
+
+            // chose one to merge, prefer the one with a combiner
+            MapReduceOper mapReducer = mapReducers.get(0);
+            for (MapReduceOper mro : mapReducers) {
+                if (!mro.combinePlan.isEmpty()) {
+                    mapReducer = mro;
+                    break;
+                }
+            }
+            mergeList.clear();
+            mergeList.add(mapReducer);
+        } 
+                         
+        if (mergeList.size() == 1) {
+            mergeSingleMapReduceSplittee(mergeList.get(0), splitter, splitOp);
+        } else {                                   
+            mergeAllMapReduceSplittees(mergeList, splitter, splitOp);
+        }
+        
+        return mergeList.size();
+    }
+    
+    private boolean hasSameMapKeyType(List<MapReduceOper> splittees) {
+        boolean sameKeyType = true;
+        for (MapReduceOper outer : splittees) {
+            for (MapReduceOper inner : splittees) {
+                if (inner.mapKeyType != outer.mapKeyType) {
+                    sameKeyType = false;
+                    break;
+                }
+            }
+            if (!sameKeyType) break;
+        }      
+ 
+        return sameKeyType;
+    }
+    
+    private int setIndexOnLRInSplit(int initial, POSplit splitOp)
+            throws VisitorException {
+        int index = initial;
+        
+        List<PhysicalPlan> pls = splitOp.getPlans();
+        for (PhysicalPlan pl : pls) {
+            PhysicalOperator leaf = pl.getLeaves().get(0);
+            if (leaf instanceof POLocalRearrange) {
+                POLocalRearrange lr = (POLocalRearrange)leaf;
+                try {
+                    lr.setMultiQueryIndex(index++); 
+                } catch (ExecException e) {                    
+                    int errCode = 2136;
+                    String msg = "Internal Error. Unable to set multi-query index for optimization.";
+                    throw new OptimizerException(msg, errCode, PigException.BUG, e);                   
+                }
+            } else if (leaf instanceof POSplit) {
+                POSplit spl = (POSplit)leaf;
+                index = setIndexOnLRInSplit(index, spl);
+            }
+        }
+
+        return index;
+    }
+    
+    private int mergeOneMapPlanWithIndex(PhysicalPlan pl, POSplit splitOp, 
+            int index, boolean sameKeyType) throws VisitorException {        
+        PhysicalOperator load = pl.getRoots().get(0);
+        pl.remove(load);
+                
+        int curIndex = index;
+        
+        PhysicalOperator leaf = pl.getLeaves().get(0);
+        if (leaf instanceof POLocalRearrange) {
+            POLocalRearrange lr = (POLocalRearrange)leaf;
+            try {
+                lr.setMultiQueryIndex(curIndex++);  
+            } catch (ExecException e) {                                      
+                int errCode = 2136;
+                String msg = "Internal Error. Unable to set multi-query index for optimization.";
+                throw new OptimizerException(msg, errCode, PigException.BUG, e);
+            }
+            
+            // change the map key type to tuple when 
+            // multiple splittees have different map key types
+            if (!sameKeyType) {
+                lr.setKeyType(DataType.TUPLE);
+            }
+        } else if (leaf instanceof POSplit) {
+            POSplit spl = (POSplit)leaf;
+            curIndex = setIndexOnLRInSplit(index, spl);
+        }
+                    
+        splitOp.addPlan(pl);
+               
+        return curIndex;
+    }
+    
+    private int setBaseIndexOnDemux(int initial, PODemux demuxOp) 
+            throws VisitorException {
+        int index = initial;
+        demuxOp.setBaseIndex(index++);
+
+        List<PhysicalPlan> pls = demuxOp.getPlans();
+        for (PhysicalPlan pl : pls) {
+            PhysicalOperator leaf = pl.getLeaves().get(0);
+            if (leaf instanceof POLocalRearrange) {
+                POLocalRearrange lr = (POLocalRearrange)leaf;
+                try {                    
+                    lr.setMultiQueryIndex(initial + lr.getIndex());                   
+                } catch (ExecException e) {                   
+                    int errCode = 2136;
+                    String msg = "Internal Error. Unable to set multi-query index for optimization.";
+                    throw new OptimizerException(msg, errCode, PigException.BUG, e);                         
+                }   
+            }
+            PhysicalOperator root = pl.getRoots().get(0);
+            if (root instanceof PODemux) {                
+                index = setBaseIndexOnDemux(index, (PODemux)root);
+            } else {
+                index++;
+            }
+        }
+        return index;
+    }
+    
+    private int setBaseIndexOnPackage(int initial, POMultiQueryPackage pkgOp) {
+        int index = initial;
+        pkgOp.setBaseIndex(index++);
+        
+        List<POPackage> pkgs = pkgOp.getPackages();
+        for (POPackage pkg : pkgs) {            
+            if (pkg instanceof POMultiQueryPackage) {
+                POMultiQueryPackage mpkg = (POMultiQueryPackage)pkg;
+                index = setBaseIndexOnPackage(index, mpkg);
+            } else {
+                index++;
+            }
+        }
+        return index;
+    }
+    
+    private void mergeOneReducePlanWithIndex(PhysicalPlan from, 
+            PhysicalPlan to, int initial, int current) throws VisitorException {                    
+        POPackage pk = (POPackage)from.getRoots().get(0);
+        from.remove(pk);
+ 
+        // XXX the index of the original keyInfo map is always 0,
+        // we need to shift the index so that the lookups works
+        // with the new indexed key       
+        Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pk.getKeyInfo();
+        if (keyInfo != null && keyInfo.size() > 0) {      
+            byte b = (byte)(initial | 0x80);
+            keyInfo.put(new Integer(b), keyInfo.get(0));
+        }     
+        
+        if (pk instanceof POMultiQueryPackage) {
+            POMultiQueryPackage mpkg = (POMultiQueryPackage)pk;
+            setBaseIndexOnPackage(initial, mpkg);
+        }
+                                
+        PhysicalOperator root = from.getRoots().get(0);
+        if (root instanceof PODemux) {
+            PODemux demux = (PODemux)root;
+            setBaseIndexOnDemux(initial, demux);
+        }
+                    
+        POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);        
+        for (int i=initial; i<current; i++) {
+            pkg.addPackage(pk);
+        }
+        
+        PODemux demux = (PODemux)to.getLeaves().get(0);
+        for (int i=initial; i<current; i++) {
+            demux.addPlan(from);
+        }
+               
+        if (demux.isSameMapKeyType()) {
+            pkg.setKeyType(pk.getKeyType());
+        } else {
+            pkg.setKeyType(DataType.TUPLE);
+        }                
+    }
+    
+    private void mergeOneCombinePlanWithIndex(PhysicalPlan from,
+            PhysicalPlan to, int initial, int current) throws VisitorException {
+        POPackage cpk = (POPackage)from.getRoots().get(0);
+        from.remove(cpk);
+       
+        if (cpk instanceof POMultiQueryPackage) {
+            POMultiQueryPackage mpkg = (POMultiQueryPackage)cpk;
+            setBaseIndexOnPackage(initial, mpkg);
+        }
+        
+        PODemux demux = (PODemux)to.getLeaves().get(0);
+        
+        boolean isSameKeyType = demux.isSameMapKeyType();
+        
+        PhysicalOperator leaf = from.getLeaves().get(0);
+        if (leaf instanceof POLocalRearrange) {
+            POLocalRearrange clr = (POLocalRearrange)leaf;
+            try {
+                clr.setMultiQueryIndex(initial);            
+            } catch (ExecException e) {                                        
+                int errCode = 2136;
+                String msg = "Internal Error. Unable to set multi-query index for optimization.";
+                throw new OptimizerException(msg, errCode, PigException.BUG, e);
+            }
+            
+            // change the map key type to tuple when 
+            // multiple splittees have different map key types
+            if (!isSameKeyType) {
+                clr.setKeyType(DataType.TUPLE);
+            }
+        } else if (leaf instanceof PODemux) {
+            PODemux locDemux = (PODemux)leaf;
+            setBaseIndexOnDemux(initial, locDemux);
+        } 
+       
+        POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);     
+        for (int i=initial; i<current; i++) {
+            pkg.addPackage(cpk);
+        }
+        
+        // all packages should have the same key type
+        if (!isSameKeyType) {
+            cpk.setKeyType(DataType.TUPLE);          
+        } 
+        
+        pkg.setKeyType(cpk.getKeyType());
+                
+        for (int i=initial; i<current; i++) {
+            demux.addPlan(from);
+        }
+    }
+    
+    private boolean needCombiner(List<MapReduceOper> mapReducers) {
+        boolean needCombiner = false;
+        for (MapReduceOper mrOp : mapReducers) {
+            if (!mrOp.combinePlan.isEmpty()) {
+                needCombiner = true;
+                break;
+            }
+        }
+        return needCombiner;
+    }
+       
+    private PhysicalPlan createDemuxPlan(boolean sameKeyType, boolean isCombiner) 
+        throws VisitorException {
+        PODemux demux = getDemux(sameKeyType, isCombiner);
+        POMultiQueryPackage pkg= getMultiQueryPackage();
+        
+        PhysicalPlan pl = new PhysicalPlan();
+        pl.add(pkg);
+        try {
+            pl.addAsLeaf(demux);
+        } catch (PlanException e) {                   
+            int errCode = 2137;
+            String msg = "Internal Error. Unable to add demux to the plan as leaf for optimization.";
+            throw new OptimizerException(msg, errCode, PigException.BUG, e);
+        }
+        return pl;
+    }
+
+    private void mergeAllMapReduceSplittees(List<MapReduceOper> mergeList, 
+            MapReduceOper splitter, POSplit splitOp) throws VisitorException {
+       
+        boolean sameKeyType = hasSameMapKeyType(mergeList);
+        
+        log.info("Splittees have the same key type: " + sameKeyType);
+        
+        // create a new reduce plan that will be the container
+        // for the multiple reducer plans of the MROpers in the mergeList
+        PhysicalPlan redPl = createDemuxPlan(sameKeyType, false);
+        
+        // create a new combine plan that will be the container
+        // for the multiple combiner plans of the MROpers in the mergeList                
+        PhysicalPlan comPl = needCombiner(mergeList) ? 
+                createDemuxPlan(sameKeyType, true) : null;
+
+        log.info("Splittees have combiner: " + (comPl != null));
+                
+        int index = 0;             
+        
+        for (MapReduceOper mrOp : mergeList) {
+
+            // merge the map plan            
+            int incIndex = mergeOneMapPlanWithIndex(
+                    mrOp.mapPlan, splitOp, index, sameKeyType);
+                       
+            // merge the combiner plan
+            if (comPl != null) {
+                if (!mrOp.combinePlan.isEmpty()) {                    
+                    mergeOneCombinePlanWithIndex(
+                            mrOp.combinePlan, comPl, index, incIndex);
+                } else {         
+                    int errCode = 2141;
+                    String msg = "Internal Error. Cannot merge non-combiner with combiners for optimization.";
+                    throw new OptimizerException(msg, errCode, PigException.BUG);
+                }
+            }
+
+            // merge the reducer plan
+            mergeOneReducePlanWithIndex(
+                    mrOp.reducePlan, redPl, index, incIndex);
+           
+            index = incIndex;
+            
+            log.info("Merged MR job " + mrOp.getOperatorKey().getId() 
+                    + " into MR job " + splitter.getOperatorKey().getId());
+        }
+
+        PhysicalPlan splitterPl = splitter.mapPlan;
+        PhysicalOperator leaf = splitterPl.getLeaves().get(0);
+        PhysicalOperator storeOp = splitterPl.getLeaves().get(0);
+        List<PhysicalOperator> storePreds = splitterPl.getPredecessors(storeOp);  
+ 
+        // replace store operator in the splitter with split operator
+        if (leaf instanceof POStore) {                            
+            splitOp.setInputs(storePreds);
+            try {
+                splitterPl.replace(storeOp, splitOp);;
+            } catch (PlanException e) {                   
+                int errCode = 2132;
+                String msg = "Internal Error. Unable to replace store with split operator for optimization.";
+                throw new OptimizerException(msg, errCode, PigException.BUG, e);
+            }
+        }     
+        
+        splitter.setMapDone(true);
+        splitter.reducePlan = redPl;
+        splitter.setReduceDone(true);
+           
+        if (comPl != null) {
+            splitter.combinePlan = comPl;        
+        }
+        
+        for (MapReduceOper mrOp : mergeList) {
+            removeAndReconnect(mrOp, splitter);
+        }
+       
+        splitter.mapKeyType = sameKeyType ?
+                mergeList.get(0).mapKeyType : DataType.TUPLE;         
+                
+        log.info("Requested parallelism of splitter: " 
+                + splitter.getRequestedParallelism());               
     }
     
     private void mergeSingleMapReduceSplittee(MapReduceOper mapReduce, 
@@ -270,13 +690,8 @@
         PhysicalPlan pl = mapReduce.mapPlan;
         PhysicalOperator load = pl.getRoots().get(0);
         pl.remove(load);
-        try {
-            splitOp.addPlan(pl);
-        } catch (PlanException e) {
-            int errCode = 2130;
-            String msg = "Internal Error. Unable to merge split plans for optimization.";
-            throw new OptimizerException(msg, errCode, PigException.BUG, e);
-        }
+        
+        splitOp.addPlan(pl);
                               
         splitter.setMapDone(true);
         splitter.reducePlan = mapReduce.reducePlan;
@@ -301,6 +716,7 @@
     /**
      * Removes the specified MR operator from the plan after the merge. 
      * Connects its predecessors and successors to the merged MR operator
+     * 
      * @param mr the MR operator to remove
      * @param newMR the MR operator to be connected to the predecessors and 
      *              the successors of the removed operator
@@ -385,12 +801,21 @@
     }
     
     private boolean isSingleLoadMapperPlan(PhysicalPlan pl) {
-        List<PhysicalOperator> roots = pl.getRoots();
-        return (roots.size() == 1);
+        return (pl.getRoots().size() == 1);
     }
     
     private POSplit getSplit(){
-        POSplit sp = new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope)));
-        return sp;
+        return new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope)));
     } 
+    
+    private PODemux getDemux(boolean sameMapKeyType, boolean inCombiner){
+        PODemux demux = new PODemux(new OperatorKey(scope, nig.getNextNodeId(scope)));
+        demux.setSameMapKeyType(sameMapKeyType);
+        demux.setInCombiner(inCombiner);
+        return demux;
+    } 
+    
+    private POMultiQueryPackage getMultiQueryPackage(){
+        return new POMultiQueryPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
+    }  
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Mon May  4 20:50:31 2009
@@ -93,6 +93,12 @@
         super.visitSplit(spl);
         spl.setParentPlan(parent);
     }
+    
+    @Override
+    public void visitDemux(PODemux demux) throws VisitorException{
+        super.visitDemux(demux);
+        demux.setParentPlan(parent);
+    }
 
     @Override
     public void visitDistinct(PODistinct distinct) throws VisitorException {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Mon May  4 20:50:31 2009
@@ -33,6 +33,7 @@
 import org.apache.pig.PigException;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -47,6 +48,24 @@
 public class PigOutputFormat implements OutputFormat<WritableComparable, Tuple> {
     public static final String PIG_OUTPUT_FUNC = "pig.output.func";
 
+    /**
+     * In general, the mechanism for an OutputFormat in Pig to get hold of the storeFunc
+     * and the metadata information (for now schema and location provided for the store in
+     * the pig script) is through the following Utility static methods:
+     * {@link org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil#getStoreFunc(JobConf)} 
+     * - this will get the {@link org.apache.pig.StoreFunc} reference to use in the RecordWriter.write()
+     * {@link MapRedUtil#getStoreConfig(JobConf)} - this will get the {@link org.apache.pig.StoreConfig}
+     * reference which has metadata like the location (the string supplied with store statement in the script)
+     * and the {@link org.apache.pig.impl.logicalLayer.schema.Schema} of the data. The OutputFormat
+     * should NOT use the location in the StoreConfig to write the output if the location represents a 
+     * Hadoop dfs path. This is because when "speculative execution" is turned on in Hadoop, multiple
+     * attempts for the same task (for a given partition) may be running at the same time. So using the
+     * location will mean that these different attempts will over-write each other's output.
+     * The OutputFormat should use 
+     * {@link org.apache.hadoop.mapred.FileOutputFormat#getWorkOutputPath(JobConf)}
+     * which will provide a safe output directory into which the OutputFormat should write
+     * the part file (given by the name argument in the getRecordWriter() call).
+     */
     public RecordWriter<WritableComparable, Tuple> getRecordWriter(FileSystem fs, JobConf job,
             String name, Progressable progress) throws IOException {
         Path outputDir = FileOutputFormat.getWorkOutputPath(job);
@@ -56,20 +75,7 @@
     public PigRecordWriter getRecordWriter(FileSystem fs, JobConf job,
             Path outputDir, String name, Progressable progress)
             throws IOException {
-        StoreFunc store;
-        String storeFunc = job.get("pig.storeFunc", "");
-        if (storeFunc.length() == 0) {
-            store = new PigStorage();
-        } else {
-            try {
-                store = (StoreFunc) PigContext
-                        .instantiateFuncFromSpec(storeFunc);
-            } catch (Exception e) {
-                int errCode = 2081;
-                String msg = "Unable to setup the store function.";
-                throw new ExecException(msg, errCode, PigException.BUG, e);
-            }
-        }
+        StoreFunc store = MapRedUtil.getStoreFunc(job);
 
         String parentName = FileOutputFormat.getOutputPath(job).getName();
         int suffixStart = parentName.lastIndexOf('.');

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Mon May  4 20:50:31 2009
@@ -46,6 +46,7 @@
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.*;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.DependencyOrderWalkerWOSeenChk;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -1197,6 +1198,20 @@
                 .getNextNodeId(scope)));
         store.setSFile(loStore.getOutputFile());
         store.setInputSpec(loStore.getInputSpec());
+        try {
+            // create a new schema for ourselves so that when
+            // we serialize we are not serializing objects that
+            // contain the schema - apparently Java tries to
+            // serialize the object containing the schema if
+            // we are trying to serialize the schema reference in
+            // the containing object. The schema here will be serialized
+            // in JobControlCompiler
+            store.setSchema(new Schema(loStore.getSchema()));
+        } catch (FrontendException e1) {
+            int errorCode = 1060;
+            String message = "Cannot resolve Store output schema";  
+            throw new VisitorException(message, errorCode, PigException.BUG, e1);    
+        }
         currentPlan.add(store);
         
         List<LogicalOperator> op = loStore.getPlan().getPredecessors(loStore); 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Mon May  4 20:50:31 2009
@@ -80,6 +80,10 @@
     public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
         //do nothing
     }
+ 
+    public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException{
+        //do nothing
+    }
     
     public void visitPOForEach(POForEach nfe) throws VisitorException {
         List<PhysicalPlan> inpPlans = nfe.getInputPlans();
@@ -103,6 +107,15 @@
         }
     }
 
+    public void visitDemux(PODemux demux) throws VisitorException{
+        List<PhysicalPlan> plans = demux.getPlans();
+        for (PhysicalPlan plan : plans) {
+            pushWalker(mCurrentWalker.spawnChildWalker(plan));
+            visit();
+            popWalker();
+        }
+    }
+    
 	public void visitDistinct(PODistinct distinct) throws VisitorException {
         //do nothing		
 	}

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Mon May  4 20:50:31 2009
@@ -163,6 +163,15 @@
           else if (node instanceof POSplit) {
               sb.append(planString(((POSplit)node).getPlans()));
           }
+          else if (node instanceof PODemux) {
+              sb.append(planString(((PODemux)node).getPlans()));
+          }
+          else if (node instanceof POMultiQueryPackage) {
+              List<POPackage> pkgs = ((POMultiQueryPackage)node).getPackages();
+              for (POPackage pkg : pkgs) {
+                  sb.append(LSep + pkg.name() + "\n");
+              }
+          }
           else if(node instanceof POFRJoin){
             POFRJoin frj = (POFRJoin)node;
             List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans();

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Mon May  4 20:50:31 2009
@@ -154,18 +154,41 @@
         return index;
     }
 
+    /**
+     * Sets the co-group index of this operator
+     * 
+     * @param index the position of this operator in 
+     * a co-group operation 
+     * @throws ExecException if the index value is bigger then 0x7F
+     */
     public void setIndex(int index) throws ExecException {
+        setIndex(index, false);
+    }
+
+    /**
+     * Sets the multi-query index of this operator
+     * 
+     * @param index the position of the parent plan of this operator
+     * in the enclosed split operator
+     * @throws ExecException if the index value is bigger then 0x7F
+     */
+    public void setMultiQueryIndex(int index) throws ExecException {
+        setIndex(index, true);
+    }
+    
+    private void setIndex(int index, boolean multiQuery) throws ExecException {
         if (index > 0x7F) {
             int errCode = 1082;
-            String msg = "Cogroups with more than 127 inputs "
-                + " not supported.";
+            String msg = multiQuery? 
+                    "Merge more than 127 map-reduce jobs not supported."
+                  : "Cogroups with more than 127 inputs not supported.";
             throw new ExecException(msg, errCode, PigException.INPUT);
         } else {
-            this.index = (byte)index;
-        }
+            this.index = multiQuery ? (byte)(index | 0x80) : (byte)index;
+        }            
         lrOutput.set(0, new Byte(this.index));
     }
-
+    
     public boolean isDistinct() { 
         return mIsDistinct;
     }
@@ -255,12 +278,26 @@
     protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
         //Construct key
         Object key;
+        
         if(resLst.size()>1){
             Tuple t = mTupleFactory.newTuple(resLst.size());
             int i=-1;
             for(Result res : resLst)
                 t.set(++i, res.result);
-            key = t;
+            key = t;           
+        } else if (resLst.size() == 1 && keyType == DataType.TUPLE) {
+            
+            // We get here after merging multiple jobs that have different
+            // map key types into a single job during multi-query optimization.
+            // If the key isn't a tuple, it must be wrapped in a tuple.
+            Object obj = resLst.get(0).result;
+            if (obj instanceof Tuple) {
+                key = (Tuple)obj;
+            } else {
+                Tuple t = mTupleFactory.newTuple(1);
+                t.set(0, resLst.get(0).result);
+                key = t;
+            }        
         }
         else{
             key = resLst.get(0).result;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Mon May  4 20:50:31 2009
@@ -206,8 +206,18 @@
             while (tupIter.hasNext()) {
                 NullableTuple ntup = tupIter.next();
                 int index = ntup.getIndex();
-                Tuple copy = getValueTuple(ntup, index);            
-                dbs[index].add(copy);
+                Tuple copy = getValueTuple(ntup, index);  
+                
+                if (numInputs == 1) {
+                    
+                    // this is for multi-query merge where 
+                    // the numInputs is always 1, but the index
+                    // (the position of the inner plan in the 
+                    // enclosed operator) may not be 1.
+                    dbs[0].add(copy);
+                } else {
+                    dbs[index].add(copy);
+                }
                 if(reporter!=null) reporter.progress();
             }
             
@@ -240,21 +250,15 @@
      // Need to make a copy of the value, as hadoop uses the same ntup
         // to represent each value.
         Tuple val = (Tuple)ntup.getValueAsPigType();
-        /*
-        Tuple copy = mTupleFactory.newTuple(val.size());
-        for (int i = 0; i < val.size(); i++) {
-            copy.set(i, val.get(i));
-        }
-        */
         
         Tuple copy = null;
         // The "value (val)" that we just got may not
         // be the complete "value". It may have some portions
         // in the "key" (look in POLocalRearrange for more comments)
         // If this is the case we need to stitch
-        // the "value" together.
-        Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
-            keyInfo.get(index);
+        // the "value" together.        
+        Pair<Boolean, Map<Integer, Integer>> lrKeyInfo = 
+            keyInfo.get(index);           
         boolean isProjectStar = lrKeyInfo.first;
         Map<Integer, Integer> keyLookup = lrKeyInfo.second;
         int keyLookupSize = keyLookup.size();
@@ -364,5 +368,4 @@
         this.distinct = distinct;
     }
 
-
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java Mon May  4 20:50:31 2009
@@ -32,11 +32,11 @@
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
  * The MapReduce Split operator.
+ * <p>
  * The assumption here is that
  * the logical to physical translation
  * will create this dummy operator with
@@ -56,7 +56,7 @@
  * This is different than the existing implementation
  * where the POSplit writes to sidefiles after filtering
  * and then loads the appropriate file.
- * 
+ * <p>
  * The approach followed here is as good as the old
  * approach if not better in many cases because
  * of the availability of attachinInputs. An optimization
@@ -185,7 +185,7 @@
      * the nested input plan list
      * @param inPlan plan to be appended to the list
      */
-    public void addPlan(PhysicalPlan inPlan) throws PlanException {        
+    public void addPlan(PhysicalPlan inPlan) {        
         myPlans.add(inPlan);
         processedSet.set(myPlans.size()-1);
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Mon May  4 20:50:31 2009
@@ -30,6 +30,7 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -53,6 +54,7 @@
     private final Log log = LogFactory.getLog(getClass());
     private POStoreImpl impl;
     private FileSpec sFile;
+    private Schema schema;
 
     // flag to distinguish user stores from MRCompiler stores.
     private boolean isTmpStore;
@@ -81,7 +83,7 @@
     public void setUp() throws IOException{
         if (impl != null) {
             try{
-                storer = impl.createStoreFunc(sFile);
+                storer = impl.createStoreFunc(sFile, schema);
             }catch (IOException ioe) {
                 int errCode = 2081;
                 String msg = "Unable to setup the store function.";            
@@ -184,4 +186,12 @@
     public void setStoreImpl(POStoreImpl impl) {
         this.impl = impl;
     }
+
+    public void setSchema(Schema schema) {
+        this.schema = schema;
+    }
+    
+    public Schema getSchema() {
+        return schema;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStoreImpl.java Mon May  4 20:50:31 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 /**
  * This class is used to specify the actual behavior of the store
@@ -31,7 +32,8 @@
      * @param sFile - The file the store should write to
      * @throws IOException
      */
-    public abstract StoreFunc createStoreFunc(FileSpec sFile) throws IOException;
+    public abstract StoreFunc createStoreFunc(FileSpec sFile, Schema schema) 
+        throws IOException;
     
     /**
      * At the end of processing, the outputstream is closed

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/LocalPOStoreImpl.java Mon May  4 20:50:31 2009
@@ -24,6 +24,7 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
 
 /**
@@ -43,7 +44,8 @@
     }
 
     @Override
-    public StoreFunc createStoreFunc(FileSpec sFile) throws IOException {
+    public StoreFunc createStoreFunc(FileSpec sFile, Schema schema) 
+        throws IOException {
         this.sFile = sFile;
         storer = (StoreFunc)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
         os = FileLocalizer.create(sFile.getFileName(), pc);

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Mon May  4 20:50:31 2009
@@ -393,4 +393,13 @@
     public boolean equals(Object obj) {
         return true;
     }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+     */
+    @Override
+    public Class getStorePreparationClass() throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java Mon May  4 20:50:31 2009
@@ -159,4 +159,13 @@
         // TODO Auto-generated method stub
         
     }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+     */
+    @Override
+    public Class getStorePreparationClass() throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigDump.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigDump.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigDump.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigDump.java Mon May  4 20:50:31 2009
@@ -41,6 +41,15 @@
 
     public void putNext(Tuple f) throws IOException {
         os.write((f.toString() + recordDelimiter).getBytes());
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+     */
+    @Override
+    public Class getStorePreparationClass() throws IOException {
+        // TODO Auto-generated method stub
+        return null;
     }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Mon May  4 20:50:31 2009
@@ -327,5 +327,14 @@
         return this.fieldDel == other.fieldDel;
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+     */
+    @Override
+    public Class getStorePreparationClass() throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java Mon May  4 20:50:31 2009
@@ -36,12 +36,16 @@
  */
 public abstract class PigNullableWritable implements WritableComparable {
 
+    private static byte mqFlag = (byte)0x80;
+    
+    private static byte idxSpace = (byte)0x7F;
+    
     private boolean mNull;
 
     protected WritableComparable mValue;
 
     private byte mIndex;
-
+       
     /**
      * Compare two nullable objects.  Step one is to check if either or both
      * are null.  If one is null and the other is not, then the one that is
@@ -54,12 +58,19 @@
      */
     public int compareTo(Object o) {
         PigNullableWritable w = (PigNullableWritable)o;
+
+        if ((mIndex & mqFlag) != 0) { // this is a multi-query index
+            
+            if ((mIndex & idxSpace) < (w.mIndex & idxSpace)) return -1;
+            else if ((mIndex & idxSpace) > (w.mIndex & idxSpace)) return 1;
+        }
+        
         if (!mNull && !w.mNull) {
             return mValue.compareTo(w.mValue);
         } else if (mNull && w.mNull) {
             // If they're both null, compare the indicies
-            if (mIndex < w.mIndex) return -1;
-            else if (mIndex > w.mIndex) return 1;
+            if ((mIndex & idxSpace) < (w.mIndex & idxSpace)) return -1;
+            else if ((mIndex & idxSpace) > (w.mIndex & idxSpace)) return 1;
             else return 0;
         }
         else if (mNull) return -1; 

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Mon May  4 20:50:31 2009
@@ -451,7 +451,7 @@
     	planTester.buildPlan("a = load 'input';");
     	LogicalPlan lp = planTester.buildPlan("b = order a by $0;");
     	PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
-    	POStore store = GenPhyOp.topStoreOp();
+    	POStore store = GenPhyOp.dummyPigStorageOp();
     	pp.addAsLeaf(store);
     	MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
     	

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLocal.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLocal.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLocal.java Mon May  4 20:50:31 2009
@@ -288,6 +288,15 @@
             return null;
         }
 
+        /* (non-Javadoc)
+         * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+         */
+        @Override
+        public Class getStorePreparationClass() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
     }
 
 

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java Mon May  4 20:50:31 2009
@@ -313,6 +313,15 @@
             return null;
         }
 
+        /* (non-Javadoc)
+         * @see org.apache.pig.StoreFunc#getStorePreparationClass()
+         */
+        @Override
+        public Class getStorePreparationClass() throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
     }
 
 



Mime
View raw message