pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rd...@apache.org
Subject svn commit: r957277 [1/3] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLaye...
Date Wed, 23 Jun 2010 17:29:34 GMT
Author: rding
Date: Wed Jun 23 17:29:33 2010
New Revision: 957277

URL: http://svn.apache.org/viewvc?rev=957277&view=rev
Log:
PIG-1333: API interface to Pig

Added:
    hadoop/pig/trunk/src/org/apache/pig/PigRunner.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/Main.java
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Jun 23 17:29:33 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1333: API interface to Pig (rding)
+
 PIG-1405: Need to move many standard functions from piggybank into Pig
 (aniket486 via daijy)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Wed Jun 23 17:29:33 2010
@@ -46,10 +46,12 @@ import jline.History;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
 
+import org.apache.pig.PigRunner.ReturnCode;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.ExecType;
@@ -60,6 +62,7 @@ import org.apache.pig.impl.util.JarManag
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.cmdline.CmdLineParser;
 import org.apache.pig.tools.grunt.Grunt;
@@ -72,9 +75,8 @@ import org.apache.pig.tools.parameters.P
  */
 @InterfaceAudience.LimitedPrivate({"Oozie"})
 @InterfaceStability.Stable
-public class Main
-{
-
+public class Main {
+   
     private final static Log log = LogFactory.getLog(Main.class);
     
     private static final String LOG4J_CONF = "log4jconf";
@@ -83,9 +85,7 @@ public class Main
     private static final String JAR = "jar";
     private static final String VERBOSE = "verbose";
     
-    private enum ExecMode {STRING, FILE, SHELL, UNKNOWN}
-
-    private static boolean checkScriptOnly = false;
+    private enum ExecMode {STRING, FILE, SHELL, UNKNOWN}    
                 
 /**
  * The Main-Class for the Pig Jar that will provide a shell and setup a classpath appropriate
@@ -97,8 +97,13 @@ public class Main
  *            shell.
  * @throws IOException
  */
-public static void main(String args[])
-{
+public static void main(String args[]) {
+    GenericOptionsParser parser = new GenericOptionsParser(args);
+    String[] pigArgs = parser.getRemainingArgs();
+    System.exit(run(pigArgs));
+}
+
+static int run(String args[]) {
     int rc = 1;
     Properties properties = new Properties();
     PropertiesUtil.loadDefaultProperties(properties);
@@ -107,6 +112,8 @@ public static void main(String args[])
     boolean gruntCalled = false;
     String logFileName = null;
     boolean userSpecifiedLog = false;
+    
+    boolean checkScriptOnly = false;
 
     try {
         BufferedReader pin = null;
@@ -173,7 +180,7 @@ public static void main(String args[])
                 break;
 
             case 'c': 
-                checkScriptOnly = true;
+                checkScriptOnly = true;                
                 break;
 
             case 'd':
@@ -199,13 +206,11 @@ public static void main(String args[])
 
             case 'h':
                 usage();
-                rc = 0;
-                return;
+                return ReturnCode.SUCCESS;
 
             case 'i':
             	System.out.println(getVersionString());
-                rc = 0;
-            	return;
+            	return ReturnCode.SUCCESS;
 
             case 'j': 
                 String jarsString = opts.getValStr();
@@ -237,7 +242,6 @@ public static void main(String args[])
                 break;
                             
             case 'p': 
-                String val = opts.getValStr();
                 params.add(opts.getValStr());
                 break;
                             
@@ -320,7 +324,7 @@ public static void main(String args[])
             pin = runParamPreprocessor(in, params, paramFiles, substFile, debug || dryrun || checkScriptOnly);
             if (dryrun) {
                 log.info("Dry run completed. Substituted pig script is at " + substFile);
-                return;
+                return ReturnCode.SUCCESS;
             }
 
             logFileName = validateLogFile(logFileName, file);
@@ -343,13 +347,13 @@ public static void main(String args[])
             if(checkScriptOnly) {
                 grunt.checkScript(substFile);
                 System.err.println(file + " syntax OK");
-                rc = 0;
+                rc = ReturnCode.SUCCESS;
             } else {
                 int results[] = grunt.exec();
                 rc = getReturnCodeForStats(results);
             }
             
-            return;
+            return rc;
         }
 
         case STRING: {
@@ -357,8 +361,7 @@ public static void main(String args[])
                 System.err.println("ERROR:" +
                         "-c (-check) option is only valid " +
                         "when executing pig with a pig script file)");
-                rc = 2; // failure
-                return;
+                return ReturnCode.ILLEGAL_ARGS;
             }
             // Gather up all the remaining arguments into a string and pass them into
             // grunt.
@@ -375,9 +378,8 @@ public static void main(String args[])
             grunt = new Grunt(in, pigContext);
             gruntCalled = true;
             int results[] = grunt.exec();
-            rc = getReturnCodeForStats(results);
-            return;
-            }
+            return getReturnCodeForStats(results);
+        }
 
         default:
             break;
@@ -393,8 +395,7 @@ public static void main(String args[])
                 System.err.println("ERROR:" +
                         "-c (-check) option is only valid " +
                         "when executing pig with a pig script file)");
-                rc = 2; // failure
-                return;
+                return ReturnCode.ILLEGAL_ARGS;
             }
             // Interactive
             mode = ExecMode.SHELL;
@@ -408,8 +409,7 @@ public static void main(String args[])
             grunt.setConsoleReader(reader);
             gruntCalled = true;
             grunt.run();
-            rc = 0;
-            return;
+            return ReturnCode.SUCCESS;
         } else {
             // They have a pig script they want us to run.
             if (remainders.length > 1) {
@@ -423,7 +423,7 @@ public static void main(String args[])
             pin = runParamPreprocessor(in, params, paramFiles, substFile, debug || dryrun || checkScriptOnly);
             if (dryrun){
                 log.info("Dry run completed. Substituted pig script is at " + substFile);
-                return;
+                return ReturnCode.SUCCESS;
             }
             
             logFileName = validateLogFile(logFileName, remainders[0]);
@@ -446,59 +446,57 @@ public static void main(String args[])
             if(checkScriptOnly) {
                 grunt.checkScript(substFile);
                 System.err.println(remainders[0] + " syntax OK");
-                rc = 0;
+                rc = ReturnCode.SUCCESS;
             } else {
                 int results[] = grunt.exec();
                 rc = getReturnCodeForStats(results);
             }
-            return;
+            return rc;
         }
 
         // Per Utkarsh and Chris invocation of jar file via pig depricated.
     } catch (ParseException e) {
         usage();
-        rc = 2;
-    } catch (NumberFormatException e) {
+        rc = ReturnCode.PARSE_EXCEPTION;
+        PigStatsUtil.setErrorMessage(e.getMessage());
+    } catch (org.apache.pig.tools.parameters.ParseException e) {
         usage();
-        rc = 2;
-    } catch (PigException pe) {
-        if(pe.retriable()) {
-            rc = 1; 
+        rc = ReturnCode.PARSE_EXCEPTION;
+        PigStatsUtil.setErrorMessage(e.getMessage());
+    } catch (IOException e) {
+        if (e instanceof PigException) {
+            PigException pe = (PigException)e;
+            rc = (pe.retriable()) ? ReturnCode.RETRIABLE_EXCEPTION 
+                    : ReturnCode.PIG_EXCEPTION;
+            PigStatsUtil.setErrorMessage(pe.getMessage());
+            PigStatsUtil.setErrorCode(pe.getErrorCode());
         } else {
-            rc = 2;
+            rc = ReturnCode.IO_EXCEPTION;
+            PigStatsUtil.setErrorMessage(e.getMessage());
         }
-
+        
         if(!gruntCalled) {
-        	LogUtils.writeLog(pe, logFileName, log, verbose, "Error before Pig is launched");
+        	LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
         }
     } catch (Throwable e) {
-        rc = 2;
+        rc = ReturnCode.THROWABLE_EXCEPTION;
+        PigStatsUtil.setErrorMessage(e.getMessage());
         if(!gruntCalled) {
         	LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
-        }
+        }      
     } finally {
         // clear temp files
         FileLocalizer.deleteTempFiles();
-        PerformanceTimerFactory.getPerfTimerFactory().dumpTimers();
-        System.exit(rc);
+        PerformanceTimerFactory.getPerfTimerFactory().dumpTimers();        
     }
+    
+    return rc;
 }
 
 private static int getReturnCodeForStats(int[] stats) {
-    if (stats[1] == 0) {
-        // no failed jobs
-        return 0;
-    }
-    else {
-        if (stats[0] == 0) {
-            // no succeeded jobs
-            return 2;
-        }
-        else {
-            // some jobs have failed
-            return 3;
-        }
-    }
+    return (stats[1] == 0) ? ReturnCode.SUCCESS         // no failed jobs
+                : (stats[0] == 0) ? ReturnCode.FAILURE  // no succeeded jobs
+                        : ReturnCode.PARTIAL_FAILURE;   // some jobs have failed
 }
 
 //TODO jz: log4j.properties should be used instead

Added: hadoop/pig/trunk/src/org/apache/pig/PigRunner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigRunner.java?rev=957277&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigRunner.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/PigRunner.java Wed Jun 23 17:29:33 2010
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
+
+/**
+ * A utility to help run PIG scripts within a Java program.
+ */
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class PigRunner {
+
+    // return codes
+    public static abstract class ReturnCode {
+        public final static int UNKNOWN = -1;
+        public final static int SUCCESS = 0;      
+        public final static int RETRIABLE_EXCEPTION = 1;
+        public final static int FAILURE = 2;         
+        public final static int PARTIAL_FAILURE = 3; 
+        public final static int ILLEGAL_ARGS = 4;
+        public final static int IO_EXCEPTION = 5;
+        public final static int PIG_EXCEPTION = 6;
+        public final static int PARSE_EXCEPTION = 7;
+        public final static int THROWABLE_EXCEPTION = 8;
+    }
+    
+    public static PigStats run(String[] args) {
+        GenericOptionsParser parser = new GenericOptionsParser(args);
+        String[] pigArgs = parser.getRemainingArgs();
+        return PigStatsUtil.getPigStats(Main.run(pigArgs));
+    }
+    
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Wed Jun 23 17:29:33 2010
@@ -21,7 +21,6 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
-import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintStream;
@@ -29,33 +28,30 @@ import java.io.StringReader;
 import java.io.StringWriter;
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Stack;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.Job;
-
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.HJob;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.builtin.BinStorage;
@@ -66,18 +62,19 @@ import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
 import org.apache.pig.experimental.logical.optimizer.LogicalPlanOptimizer;
-import org.apache.pig.experimental.logical.optimizer.PlanPrinter;
 import org.apache.pig.experimental.logical.optimizer.UidStamper;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LOConst;
+import org.apache.pig.impl.logicalLayer.LODefine;
 import org.apache.pig.impl.logicalLayer.LOForEach;
 import org.apache.pig.impl.logicalLayer.LOLimit;
 import org.apache.pig.impl.logicalLayer.LOLoad;
 import org.apache.pig.impl.logicalLayer.LOSort;
 import org.apache.pig.impl.logicalLayer.LOSplit;
 import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.impl.logicalLayer.LOVisitor;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -88,23 +85,23 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
-import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.PropertiesUtil;
-import org.apache.pig.impl.logicalLayer.LODefine;
-import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.pen.ExampleGenerator;
-import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.tools.grunt.GruntParser;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.ScriptState;
 
 
 /**
@@ -319,9 +316,24 @@ public class PigServer {
      * @throws ExecException
      */
     public List<ExecJob> executeBatch() throws FrontendException, ExecException {
+        PigStats stats = executeBatchEx();
+        LinkedList<ExecJob> jobs = new LinkedList<ExecJob>();
+        for (OutputStats output : stats.getOutputStats()) {
+            if (output.isSuccessful()) {
+                jobs.add(new HJob(HJob.JOB_STATUS.COMPLETED, pigContext, output
+                        .getPOStore(), output.getAlias(), stats));
+            } else {
+                jobs.add(new HJob(HJob.JOB_STATUS.FAILED, pigContext, output
+                        .getPOStore(), output.getAlias(), stats));
+            }
+        }
+        return jobs;
+    }
+
+    private PigStats executeBatchEx() throws FrontendException, ExecException {
         if (!isMultiQuery) {
             // ignore if multiquery is off
-            return new LinkedList<ExecJob>();
+            return PigStatsUtil.getEmptyPigStats();
         }
 
         if (currDAG == null || !isBatchOn()) {
@@ -332,7 +344,7 @@ public class PigServer {
         
         return currDAG.execute();
     }
-
+    
     /**
      * Discards a batch of Pig commands.
      * 
@@ -665,7 +677,8 @@ public class PigServer {
             if (currDAG.isBatchOn()) {
                 currDAG.execute();
             }
-            ExecJob job = store(id, FileLocalizer.getTemporaryPath(null, pigContext).toString(), BinStorage.class.getName() + "()");
+            ExecJob job = store(id, FileLocalizer.getTemporaryPath(null, pigContext)
+                    .toString(), BinStorage.class.getName() + "()");
             
             // invocation of "execute" is synchronous!
 
@@ -742,11 +755,21 @@ public class PigServer {
      * @return {@link ExecJob} containing information about this job
      * @throws IOException
      */
-    public ExecJob store(
+    public ExecJob store(String id, String filename, String func) 
+            throws IOException {
+        PigStats stats = storeEx(id, filename, func);
+        if (stats.getOutputStats().size() < 1) {
+            throw new IOException("Couldn't retrieve job.");
+        }
+        OutputStats output = stats.getOutputStats().get(0);
+        return new HJob(JOB_STATUS.COMPLETED, pigContext, output
+                .getPOStore(), output.getAlias(), stats);
+    }
+       
+    private PigStats storeEx(
             String id,
             String filename,
             String func) throws IOException {
-
         if (!currDAG.getAliasOp().containsKey(id)) {
             throw new IOException("Invalid alias: " + id);
         }
@@ -770,21 +793,18 @@ public class PigServer {
                 }
             }
             
-            LogicalPlan unCompiledstorePlan = QueryParser.generateStorePlan(scope, lp, filename, func, leaf, leaf.getAlias(), pigContext);
+            LogicalPlan unCompiledstorePlan = QueryParser.generateStorePlan(
+                    scope, lp, filename, func, leaf, leaf.getAlias(),
+                    pigContext);
             LogicalPlan storePlan = compileLp(unCompiledstorePlan, true);
-            List<ExecJob> jobs = executeCompiledLogicalPlan(storePlan);
-            if (jobs.size() < 1) {
-                throw new IOException("Couldn't retrieve job.");
-            }
-            return jobs.get(0);
+            return executeCompiledLogicalPlan(storePlan);
         } catch (Exception e) {
             int errCode = 1002;
             String msg = "Unable to store alias " + id;
             throw new FrontendException(msg, errCode, PigException.INPUT, e);
-        }
-
+        }   
     }
-
+    
     /**
      * Provide information on how a pig query will be executed.  For now
      * this information is very developer focussed, and probably not very
@@ -1069,38 +1089,46 @@ public class PigServer {
         return lp;
     }
     
-    private List<ExecJob> execute(String alias) throws FrontendException, ExecException {
+    private PigStats execute(String alias) throws FrontendException, ExecException {
         LogicalPlan typeCheckedLp = compileLp(alias);
 
         if (typeCheckedLp.size() == 0) {
-            return new LinkedList<ExecJob>();
+            return PigStatsUtil.getEmptyPigStats();
         }
 
         LogicalOperator op = typeCheckedLp.getLeaves().get(0);
         if (op instanceof LODefine) {
             log.info("Skip execution of DEFINE only logical plan.");
-            return new LinkedList<ExecJob>();
+            return PigStatsUtil.getEmptyPigStats();
         }
 
         return executeCompiledLogicalPlan(typeCheckedLp);
     }
     
-    private List<ExecJob> executeCompiledLogicalPlan(LogicalPlan compiledLp) throws ExecException {
+    private PigStats executeCompiledLogicalPlan(LogicalPlan compiledLp) throws ExecException {
+        // discover pig features used in this script
+        ScriptState.get().setScriptFeatures(compiledLp);
         PhysicalPlan pp = compilePp(compiledLp);
         // execute using appropriate engine
-        List<ExecJob> execJobs = pigContext.getExecutionEngine().execute(pp, "job_pigexec_");
-        for (ExecJob execJob: execJobs) {
-            if (execJob.getStatus()==ExecJob.JOB_STATUS.FAILED) {
-                POStore store = execJob.getPOStore();
+        List<ExecJob> jobs = pigContext.getExecutionEngine().execute(pp, "job_pigexec_");
+        PigStats stats = null;
+        if (jobs.size() > 0) {
+            stats = jobs.get(0).getStatistics();
+        } else {
+            stats = PigStatsUtil.getEmptyPigStats();
+        }
+        for (OutputStats output : stats.getOutputStats()) {
+            if (!output.isSuccessful()) {
+                POStore store = output.getPOStore();
                 try {
                     store.getStoreFunc().cleanupOnFailure(store.getSFile().getFileName(),
-                            new Job(ConfigurationUtil.toConfiguration(execJob.getConfiguration())));
+                            new Job(output.getConf()));
                 } catch (IOException e) {
                     throw new ExecException(e);
                 }
             }
         }
-        return execJobs;
+        return stats;
     }
 
     private LogicalPlan compileLp(
@@ -1108,7 +1136,6 @@ public class PigServer {
         return compileLp(alias, true);
     }
 
-    @SuppressWarnings("unchecked")
     private LogicalPlan compileLp(
             String alias,
             boolean optimize) throws FrontendException {
@@ -1327,15 +1354,15 @@ public class PigServer {
 
         boolean isBatchEmpty() { return processedStores == storeOpTable.keySet().size(); }
         
-        List<ExecJob> execute() throws ExecException, FrontendException {
+        PigStats execute() throws ExecException, FrontendException {
             pigContext.getProperties().setProperty(PigContext.JOB_NAME, jobName);
             if (jobPriority != null) {
               pigContext.getProperties().setProperty(PigContext.JOB_PRIORITY, jobPriority);
             }
             
-            List<ExecJob> jobs = PigServer.this.execute(null);
+            PigStats stats = PigServer.this.execute(null);
             processedStores = storeOpTable.keySet().size();
-            return jobs;
+            return stats;
         }
 
         void markAsExecuted() {
@@ -1471,7 +1498,7 @@ public class PigServer {
             }          
             return graph;
         }
-        
+       
         private void postProcess() throws IOException {
             
             // Set the logical plan values correctly in all the operators

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Jun 23 17:29:33 2010
@@ -18,9 +18,6 @@
 
 package org.apache.pig.backend.hadoop.executionengine;
 
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
 import java.io.PrintStream;
 import java.net.Socket;
 import java.net.SocketException;
@@ -38,11 +35,10 @@ import java.util.Properties;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -57,13 +53,16 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
-import org.apache.pig.experimental.logical.optimizer.PlanPrinter;
 import org.apache.pig.experimental.logical.optimizer.UidStamper;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 
 public class HExecutionEngine implements ExecutionEngine {
@@ -73,8 +72,6 @@ public class HExecutionEngine implements
     
     private static final String HADOOP_SITE = "hadoop-site.xml";
     private static final String CORE_SITE = "core-site.xml";
-    private static final String MAPRED_SYS_DIR = "mapred.system.dir";
-    
     private final Log log = LogFactory.getLog(getClass());
     public static final String LOCAL = "local";
     
@@ -299,18 +296,16 @@ public class HExecutionEngine implements
         try {
             PigStats stats = launcher.launchPig(plan, jobName, pigContext);
 
-            for (POStore store: launcher.getSucceededFiles()) {
-                FileSpec spec = store.getSFile();
-                String alias = leafMap.containsKey(spec.toString()) ? leafMap.get(spec.toString()).getAlias() : null;
-                jobs.add(new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, store, alias, stats));
-            }
-
-            for (POStore store: launcher.getFailedFiles()) {
-                FileSpec spec = store.getSFile();
-                String alias = leafMap.containsKey(spec.toString()) ? leafMap.get(spec.toString()).getAlias() : null;
-                HJob j = new HJob(ExecJob.JOB_STATUS.FAILED, pigContext, store, alias, stats);
-                j.setException(launcher.getError(spec));
-                jobs.add(j);
+            for (OutputStats output : stats.getOutputStats()) {
+                POStore store = output.getPOStore();               
+                String alias = store.getAlias();
+                if (output.isSuccessful()) {
+                    jobs.add(new HJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, store, alias, stats));
+                } else {
+                    HJob j = new HJob(ExecJob.JOB_STATUS.FAILED, pigContext, store, alias, stats);  
+                    j.setException(launcher.getError(store.getSFile()));
+                    jobs.add(j);
+                }
             }
 
             return jobs;
@@ -411,5 +406,31 @@ public class HExecutionEngine implements
                 properties.put(key, val);
             }
         }
-    }    
+    } 
+    
+    public static FileSpec checkLeafIsStore(
+            PhysicalPlan plan,
+            PigContext pigContext) throws ExecException {
+        try {
+            PhysicalOperator leaf = plan.getLeaves().get(0);
+            FileSpec spec = null;
+            if(!(leaf instanceof POStore)){
+                String scope = leaf.getOperatorKey().getScope();
+                POStore str = new POStore(new OperatorKey(scope,
+                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+                spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
+                    pigContext).toString(),
+                    new FuncSpec(BinStorage.class.getName()));
+                str.setSFile(spec);
+                plan.addAsLeaf(str);
+            } else{
+                spec = ((POStore)leaf).getSFile();
+            }
+            return spec;
+        } catch (Exception e) {
+            int errCode = 2045;
+            String msg = "Internal error. Not able to check if the leaf node is a store operator.";
+            throw new ExecException(msg, errCode, PigException.BUG, e);
+        }
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Jun 23 17:29:33 2010
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -168,6 +169,10 @@ public class JobControlCompiler{
         UDFContext.getUDFContext().reset();
     }
 
+    Map<Job, MapReduceOper> getJobMroMap() {
+        return Collections.unmodifiableMap(jobMroMap);
+    }
+    
     /**
      * Moves all the results of a collection of MR jobs to the final
      * output directory. Some of the results may have been put into a
@@ -809,7 +814,7 @@ public class JobControlCompiler{
             super(BagFactory.getInstance().newDefaultBag().getClass(), true);
         }
     }
-
+    
     private void selectComparator(
             MapReduceOper mro,
             byte keyType,

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Wed Jun 23 17:29:33 2010
@@ -22,7 +22,6 @@ import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.regex.Matcher;
@@ -40,9 +39,7 @@ import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.LogUtils;
@@ -56,9 +53,6 @@ public abstract class Launcher {
     boolean pigException = false;
     boolean outOfMemory = false;
     static final String OOM_ERR = "OutOfMemoryError";
-
-    protected List<POStore> succeededStores = null;
-    protected List<POStore> failedStores = null;
     
     protected Launcher(){
         totalHadoopTimeSpent = 0;
@@ -70,28 +64,10 @@ public abstract class Launcher {
     }
 
     /**
-     *
-     * @return A list of {@link POStore} objects corresponding to the store
-     * statements that were successful
-     */
-    public List<POStore> getSucceededFiles() {
-        return succeededStores;
-    }
-
-    /**
-     * @return A list of {@link POStore} objects corresponding to the store
-     * statements that failed
-     */
-    public List<POStore> getFailedFiles() {
-        return failedStores;
-    }
-
-    /**
      * Resets the state after a launch
      */
     public void reset() {
-        succeededStores = new LinkedList<POStore>();
-        failedStores = new LinkedList<POStore>();
+
     }
 
     /**

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Jun 23 17:29:33 2010
@@ -71,6 +71,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataType;
@@ -999,6 +1000,15 @@ public class MRCompiler extends PhyPlanV
         try{
             nonBlocking(op);
             phyToMROpMap.put(op, curMROp);
+            if (op.getPackageType() == PackageType.JOIN) {
+                curMROp.setRegularJoin(true);
+            } else if (op.getPackageType() == PackageType.GROUP) {
+                if (op.getNumInps() == 1) {
+                    curMROp.setGroupBy(true);
+                } else if (op.getNumInps() > 1) {
+                    curMROp.setCogroup(true);
+                }
+            }
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + op.getClass().getSimpleName();
@@ -1865,6 +1875,7 @@ public class MRCompiler extends PhyPlanV
             keyType);
         lr.setPlans(eps1);
         lr.setResultType(DataType.TUPLE);
+        lr.setAlias(sort.getAlias());
         mro.mapPlan.addAsLeaf(lr);
         
         mro.setMapDone(true);
@@ -1954,6 +1965,7 @@ public class MRCompiler extends PhyPlanV
         POSort sort = new POSort(inpSort.getOperatorKey(), inpSort
                 .getRequestedParallelism(), null, inpSort.getSortPlans(),
                 inpSort.getMAscCols(), inpSort.getMSortFunc());
+    	sort.setAlias(inpSort.getAlias());
     	
     	// Turn the asc/desc array into an array of strings so that we can pass it
         // to the FindQuantiles function.
@@ -2153,6 +2165,7 @@ public class MRCompiler extends PhyPlanV
         lr.setKeyType(DataType.CHARARRAY);
         lr.setPlans(eps);
         lr.setResultType(DataType.TUPLE);
+        lr.setAlias(sort.getAlias());
         mro.mapPlan.add(lr);
         mro.mapPlan.connect(nfe1, lr);
         
@@ -2299,6 +2312,7 @@ public class MRCompiler extends PhyPlanV
         
         mro.setReduceDone(true);
         mro.requestedParallelism = 1;
+        mro.setSampling(true);
         return new Pair<MapReduceOper, Integer>(mro, parallelismForSort);
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Jun 23 17:29:33 2010
@@ -22,8 +22,8 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -42,31 +42,32 @@ import org.apache.hadoop.mapreduce.JobCo
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
+import org.apache.pig.PigRunner.ReturnCode;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.util.ConfigurationValidator;
 import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 
+
 /**
  * Main class that launches pig for Map Reduce
  *
@@ -78,6 +79,7 @@ public class MapReduceLauncher extends L
     private Exception jobControlException = null;
     private String jobControlExceptionStackTrace = null;
     private boolean aggregateWarning = false;
+
     private Map<FileSpec, Exception> failureMap;
 
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
@@ -94,11 +96,11 @@ public class MapReduceLauncher extends L
     }
 
     @Override
-    public void reset() {
+    public void reset() {  
         failureMap = new HashMap<FileSpec, Exception>();
         super.reset();
     }
-
+   
     @SuppressWarnings("deprecation")
     @Override
     public PigStats launchPig(PhysicalPlan php,
@@ -112,18 +114,17 @@ public class MapReduceLauncher extends L
         long sleepTime = 500;
         aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
         MROperPlan mrp = compile(php, pc);
-        PigStats stats = new PigStats();
-        stats.setMROperatorPlan(mrp);
-        stats.setExecType(pc.getExecType());
-        stats.setPhysicalPlan(php);
-        
+                
         ExecutionEngine exe = pc.getExecutionEngine();
         ConfigurationValidator.validatePigProperties(exe.getConfiguration());
         Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
         JobClient jobClient = new JobClient(((HExecutionEngine)exe).getJobConf());
-
+        
         JobControlCompiler jcc = new JobControlCompiler(pc, conf);
         
+        // start collecting statistics
+        PigStatsUtil.startCollection(pc, jobClient, jcc, mrp); 
+        
         List<Job> failedJobs = new LinkedList<Job>();
         List<Job> completeFailedJobsInThisRun = new LinkedList<Job>();
         List<Job> succJobs = new LinkedList<Job>();
@@ -199,51 +200,58 @@ public class MapReduceLauncher extends L
             	}
             	lastProg = prog;
             }
-
+            
             //check for the jobControlException first
             //if the job controller fails before launching the jobs then there are
             //no jobs to check for failure
-            if(jobControlException != null) {
-                if(jobControlException instanceof PigException) {
-                        if(jobControlExceptionStackTrace != null) {
-                            LogUtils.writeLog("Error message from job controller", jobControlExceptionStackTrace, 
-                                    pc.getProperties().getProperty("pig.logfile"), 
-                                    log);
-                        }
-                        throw jobControlException;
+            if (jobControlException != null) {
+                if (jobControlException instanceof PigException) {
+                    if (jobControlExceptionStackTrace != null) {
+                        LogUtils.writeLog("Error message from job controller",
+                                jobControlExceptionStackTrace, pc
+                                        .getProperties().getProperty(
+                                                "pig.logfile"), log);
+                    }
+                    throw jobControlException;
                 } else {
-                        int errCode = 2117;
-                        String msg = "Unexpected error when launching map reduce job.";        	
-                        throw new ExecException(msg, errCode, PigException.BUG, jobControlException);
+                    int errCode = 2117;
+                    String msg = "Unexpected error when launching map reduce job.";
+                    throw new ExecException(msg, errCode, PigException.BUG,
+                            jobControlException);
                 }
             }
-
-            if (!jc.getFailedJobs().isEmpty() )
-            {
-                if ("true".equalsIgnoreCase(
-                  pc.getProperties().getProperty("stop.on.failure","false"))) {
+            
+            if (!jc.getFailedJobs().isEmpty() ) {
+                if ("true".equalsIgnoreCase(pc.getProperties().getProperty(
+                        "stop.on.failure", "false"))) {
                     int errCode = 6017;
                     StringBuilder msg = new StringBuilder();
                     
-                    for (int i=0;i<jc.getFailedJobs().size();i++) {
+                    for (int i=0; i<jc.getFailedJobs().size(); i++) {
                         Job j = jc.getFailedJobs().get(i);
                         msg.append(getFirstLineFromMessage(j.getMessage()));
-                        if (i!=jc.getFailedJobs().size()-1)
+                        if (i!=jc.getFailedJobs().size()-1) {
                             msg.append("\n");
+                        }
                     }
                     
-                    throw new ExecException(msg.toString(), 
-                                            errCode, PigException.REMOTE_ENVIRONMENT);
+                    throw new ExecException(msg.toString(), errCode,
+                            PigException.REMOTE_ENVIRONMENT);
                 }
-                // If we only have one store and that job fail, then we sure that the job completely fail, and we shall stop dependent jobs
-                for (Job job : jc.getFailedJobs())
-                {
+                
+                // If we only have one store and that job fail, then we sure 
+                // that the job completely fail, and we shall stop dependent jobs
+                for (Job job : jc.getFailedJobs()) {
                     List<POStore> sts = jcc.getStores(job);
-                    if (sts.size()==1)
+                    if (sts.size()==1) {
                         completeFailedJobsInThisRun.add(job);
+                    }
                 }
                 failedJobs.addAll(jc.getFailedJobs());
             }
+
+            // update Pig stats' job DAG with job ids of just completed jobs
+            PigStatsUtil.updateJobMroMap(jcc.getJobMroMap());
             
             int removedMROp = jcc.updateMROpPlan(completeFailedJobsInThisRun);
             
@@ -252,50 +260,42 @@ public class MapReduceLauncher extends L
             List<Job> jobs = jc.getSuccessfulJobs();
             jcc.moveResults(jobs);
             succJobs.addAll(jobs);
-            
-            
-            stats.setJobClient(jobClient);
-            stats.setJobControl(jc);
-            stats.accumulateStats();
+                        
+            // collecting statistics
+            PigStatsUtil.accumulateStats(jc);
 
             jc.stop(); 
         }
 
         log.info( "100% complete");
-      
+             
         boolean failed = false;
-        int finalStores = 0;
+        
         // Look to see if any jobs failed.  If so, we need to report that.
         if (failedJobs != null && failedJobs.size() > 0) {
-            log.error(failedJobs.size()+" map reduce job(s) failed!");
+            
             Exception backendException = null;
-
-            for (Job fj : failedJobs) {
-                
+            for (Job fj : failedJobs) {                
                 try {
                     getStats(fj, jobClient, true, pc);
                 } catch (Exception e) {
                     backendException = e;
                 }
-
                 List<POStore> sts = jcc.getStores(fj);
                 for (POStore st: sts) {
-                    if (!st.isTmpStore()) {
-                        finalStores++;
-                        log.error("Failed to produce result in: \""+st.getSFile().getFileName()+"\"");
-                    }
-                    failedStores.add(st);
                     failureMap.put(st.getSFile(), backendException);
-                    //log.error("Failed to produce result in: \""+st.getSFile().getFileName()+"\"");
                 }
+                PigStatsUtil.setBackendException(fj, backendException);
             }
             failed = true;
         }
-
+        
+        // stats collection is done, log the results
+        PigStatsUtil.stopCollection(true); 
+        
         Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();
                 
         if (succJobs != null) {
-            Map<String, String> storeCounters = new HashMap<String, String>();
             for (Job job : succJobs) {
                 List<POStore> sts = jcc.getStores(job);                
                 for (POStore st : sts) {
@@ -313,23 +313,11 @@ public class MapReduceLauncher extends L
                     if (pc.getExecType() == ExecType.LOCAL) {
                         storeSchema(job, st);
                     }
-                    if (!st.isTmpStore()) {
-                        succeededStores.add(st);
+
+                    if (!st.isTmpStore()) {                       
                         // create an "_SUCCESS" file in output location if 
                         // output location is a filesystem dir
-                        createSuccessFile(job, st);
-                        finalStores++;
-                        if (st.isMultiStore()) {
-                            String counterName = PigStatsUtil.getMultiStoreCounterName(st);
-                            long count = PigStatsUtil.getMultiStoreCount(job,
-                                    jobClient, counterName);
-                            log.info("Successfully stored " + count + " records in: \""
-                                    + st.getSFile().getFileName() + "\"");
-                            storeCounters.put(counterName, Long.valueOf(count).toString());
-                        } else {
-                            log.info("Successfully stored result in: \""
-                                    + st.getSFile().getFileName() + "\"");
-                        }                       
+                        createSuccessFile(job, st);                   
                     } else {
                         log.debug("Successfully stored result in: \""
                                 + st.getSFile().getFileName() + "\"");
@@ -341,43 +329,13 @@ public class MapReduceLauncher extends L
                     computeWarningAggregate(job, jobClient, warningAggMap);
                 }
             }
-            if (storeCounters.size() > 0) {
-                stats.addStatsGroup(PigStatsUtil.MULTI_STORE_COUNTER_GROUP, 
-                        storeCounters);
-            }
+
         }
         
         if(aggregateWarning) {
             CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log) ;
         }
-        
-        // Report records and bytes written.  Only do this in the single store case.  Multi-store
-        // scripts mess up the stats reporting from hadoop.
-        List<String> rji = stats.getRootJobIDs();
-        if ( (rji != null && rji.size() == 1 && finalStores == 1) || pc.getExecType() == ExecType.LOCAL ) {
-            // currently counters are not working in local mode - see PIG-1286
-            if(stats.getRecordsWritten()==-1 || pc.getExecType() == ExecType.LOCAL) {
-                log.info("Records written : Unable to determine number of records written");
-            } else {
-                log.info("Records written : " + stats.getRecordsWritten());
-            }
-            if(stats.getBytesWritten()==-1 || pc.getExecType() == ExecType.LOCAL) {
-                log.info("Bytes written : Unable to determine number of bytes written");
-            } else {
-                log.info("Bytes written : " + stats.getBytesWritten());
-            }
-            if(stats.getSMMSpillCount()==-1) {
-                log.info("Spillable Memory Manager spill count : Unable to determine spillable memory manager spill count");
-            } else {
-                log.info("Spillable Memory Manager spill count : " + stats.getSMMSpillCount());
-            }
-            if(stats.getProactiveSpillCount() == -1) {
-                log.info("Proactive spill count : Unable to determine proactive spill count");
-            } else {
-                log.info("Proactive spill count : " + stats.getProactiveSpillCount());
-            }
-        }
-
+                                                    
         if (!failed) {
             log.info("Success!");
         } else {
@@ -389,7 +347,11 @@ public class MapReduceLauncher extends L
         }
         jcc.reset();
 
-        return stats;
+        int ret = failed ? ((succJobs != null && succJobs.size() > 0) 
+                ? ReturnCode.PARTIAL_FAILURE
+                : ReturnCode.FAILURE)
+                : ReturnCode.SUCCESS; 
+        return PigStatsUtil.getPigStats(ret);
     }
 
     @Override
@@ -560,6 +522,7 @@ public class MapReduceLauncher extends L
         }
     }
     
+    @SuppressWarnings("deprecation")
     void computeWarningAggregate(Job job, JobClient jobClient, Map<Enum, Long> aggMap) {
         JobID mapRedJobID = job.getAssignedJobID();
         RunningJob runningJob = null;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Wed Jun 23 17:29:33 2010
@@ -83,6 +83,8 @@ public class MapReduceOper extends Opera
     // Indicate if the entire purpose for this map reduce job is doing limit, does not change
     // anything else. This is to help POPackageAnnotator to find the right POPackage to annotate
     boolean limitOnly = false;
+    
+    OPER_FEATURE feature = OPER_FEATURE.NONE;
 
     // If true, putting an identity combine in this
     // mapreduce job will speed things up.
@@ -136,6 +138,18 @@ public class MapReduceOper extends Opera
 	// Set to true in indexing job generated in map-side cogroup, merge join.
 	private boolean usingTypedComparator = false;
 	
+	private static enum OPER_FEATURE {
+	    NONE,
+	    // Indicate if this job is a sampling job
+	    SAMPLER,	    
+	    // Indicate if this job is a group by job
+	    GROUPBY,	    
+	    // Indicate if this job is a cogroup job
+	    COGROUP,	    
+	    // Indicate if this job is a regular join job
+	    HASHJOIN;
+	};
+	
     public MapReduceOper(OperatorKey k) {
         super(k);
         mapPlan = new PhysicalPlan();
@@ -299,6 +313,38 @@ public class MapReduceOper extends Opera
         this.limitOnly = limitOnly;
     }
 
+    public boolean isSampling() {
+        return (feature == OPER_FEATURE.SAMPLER);
+    }
+    
+    public void setSampling(boolean sampling) {
+        feature = OPER_FEATURE.SAMPLER;
+    }
+    
+    public boolean isGroupBy() {
+        return (feature == OPER_FEATURE.GROUPBY);
+    }
+    
+    public void setGroupBy(boolean groupBy) {
+        feature = OPER_FEATURE.GROUPBY;
+    }
+    
+    public boolean isCogroup() {
+        return (feature == OPER_FEATURE.COGROUP);
+    }
+    
+    public void setCogroup(boolean cogroup) {
+        feature = OPER_FEATURE.COGROUP;
+    }
+    
+    public boolean isRegularJoin() {
+        return (feature == OPER_FEATURE.HASHJOIN);
+    }
+    
+    public void setRegularJoin(boolean hashJoin) {
+        feature = OPER_FEATURE.HASHJOIN;
+    }
+    
     public boolean needsDistinctCombiner() { 
         return needsDistinctCombiner;
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Wed Jun 23 17:29:33 2010
@@ -38,6 +38,7 @@ import org.apache.pig.ResourceSchema.Res
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
@@ -670,7 +671,7 @@ public class LogToPhyTranslationVisitor 
             POPackage poPackage = compileToLR_GR_PackTrio(cg.getOperatorKey().scope,
                     cg.getInputs(), cg.getRequestedParallelism(), cg.getCustomPartitioner(),
                     cg.getAlias(), cg.getInner(),cg.getGroupByPlans());
-            
+            poPackage.setPackageType(PackageType.GROUP);            
             logToPhyMap.put(cg, poPackage);
             break;
             
@@ -1130,6 +1131,7 @@ public class LogToPhyTranslationVisitor 
                         e.getErrorCode(),e.getErrorSource(),e);
             }
             logToPhyMap.put(loj, fe);
+            poPackage.setPackageType(PackageType.JOIN);
 		}
 	}
 	

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Wed Jun 23 17:29:33 2010
@@ -71,6 +71,8 @@ public class POPackage extends PhysicalO
         SIMPLE_KEY_POSITION[0] = true;
     }
     
+    public static enum PackageType { GROUP, JOIN };
+    
     //The iterator of indexed Tuples
     //that is typically provided by
     //Hadoop
@@ -119,6 +121,8 @@ public class POPackage extends PhysicalO
     private boolean firstTime = true;
     
     private boolean useDefaultBag = false;
+    
+    private PackageType pkgType;
 
     public POPackage(OperatorKey k) {
         this(k, -1, null);
@@ -450,6 +454,14 @@ public class POPackage extends PhysicalO
         this.useSecondaryKey = useSecondaryKey;
     }
 
+    public void setPackageType(PackageType type) {
+        this.pkgType = type;
+    }
+    
+    public PackageType getPackageType() {
+        return pkgType;
+    }
+    
     private class POPackageTupleBuffer implements AccumulativeTupleBuffer {
         private List<Tuple>[] bags;
         private Iterator<NullableTuple> iter;

Added: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=957277&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Wed Jun 23 17:29:33 2010
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.pig.PigCounters;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
+import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
+
+/**
+ * This class encapsulates the runtime statistics of a MapReduce job. 
+ * Job statistics is collected when job is completed.
+ */
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class JobStats extends Operator {
+        
+    public static final String ALIAS = "JobStatistics:alias";
+    public static final String FEATURE = "JobStatistics:feature";
+   
+    private static final Log LOG = LogFactory.getLog(JobStats.class);
+    
+    public static enum JobState { UNKNOWN, SUCCESS, FAILED; }
+    
+    private JobState state = JobState.UNKNOWN;
+        
+    private Configuration conf;
+    
+    private List<POStore> mapStores = null;
+    
+    private List<POStore> reduceStores = null;
+    
+    private ArrayList<OutputStats> outputs;
+       
+    private String errorMsg;
+    
+    private Exception exception = null;
+            
+    @SuppressWarnings("deprecation")
+    private JobID jobId;
+    
+    private long maxMapTime = 0;
+    private long minMapTime = 0;
+    private long avgMapTime = 0;
+    private long maxReduceTime = 0;
+    private long minReduceTime = 0;
+    private long avgReduceTime = 0;
+
+    private int numberMaps = 0;
+    private int numberReduces = 0;
+    
+    private long mapInputRecords = 0;
+    private long mapOutputRecords = 0;
+    private long reduceInputRecords = 0;
+    private long reduceOutputRecords = 0;
+    private long hdfsBytesWritten = 0;
+    private long spillCount = 0;
+    private long activeSpillCount = 0;
+    
+    private HashMap<String, Long> multiStoreCounters 
+            = new HashMap<String, Long>();
+    
+    @SuppressWarnings("deprecation")
+    private Counters counters = null;
+        
+    JobStats(String name, JobGraph plan) {
+        super(name, plan);
+        outputs = new ArrayList<OutputStats>();
+    }
+
+    public String getJobId() { 
+        return (jobId == null) ? null : jobId.toString(); 
+    }
+    
+    public JobState getState() { return state; }
+    
+    public boolean isSuccessful() { return (state == JobState.SUCCESS); }
+    
+    public String getErrorMessage() { return errorMsg; }
+    
+    public Exception getException() { return exception; }
+    
+    public int getNumberMaps() { return numberMaps; }
+    
+    public int getNumberReduces() { return numberReduces; }
+    
+    public long getMaxMapTime() { return maxMapTime; }
+    
+    public long getMinMapTime() { return minMapTime; }
+    
+    public long getAvgMapTime() { return avgMapTime; }
+    
+    public long getMaxReduceTime() { return maxReduceTime; }
+    
+    public long getMinReduceTime() { return minReduceTime; }
+    
+    public long getAvgREduceTime() { return avgReduceTime; }           
+        
+    public long getMapInputRecords() { return mapInputRecords; }
+
+    public long getMapOutputRecords() { return mapOutputRecords; }
+
+    public long getReduceOutputRecords() { return reduceOutputRecords; }
+
+    public long getReduceInputRecords() { return reduceInputRecords; }
+
+    public long getSMMSpillCount() { return spillCount; }
+    
+    public long getProactiveSpillCount() { return activeSpillCount; }
+    
+    public long getHdfsBytesWritten() { return hdfsBytesWritten; }
+    
+    @SuppressWarnings("deprecation")
+    public Counters getHadoopCounters() { return counters; }
+    
+    public List<OutputStats> getOutputs() {
+        return Collections.unmodifiableList(outputs);
+    }
+
+    public Map<String, Long> getMultiStoreCounters() {
+        return Collections.unmodifiableMap(multiStoreCounters);
+    }
+       
+    public String getAlias() {
+        return (String)getAnnotation(ALIAS);
+    }
+    
+    public String getFeature() {
+        return (String)getAnnotation(FEATURE);
+    }
+        
+    /**
+     * Returns the total bytes written to user specified HDFS
+     * locations of this job.
+     */
+    public long getBytesWritten() {        
+        long count = 0;
+        for (OutputStats out : outputs) {
+            long n = out.getBytes();            
+            if (n > 0) count += n;
+        }
+        return count;
+    }
+    
+    /**
+     * Returns the total number of records in user specified output
+     * locations of this job.
+     */
+    public long getRecordWrittern() {
+        long count = 0;
+        for (OutputStats out : outputs) {
+            long rec = out.getNumberRecords();
+            if (rec > 0) count += rec;
+        }
+        return count;
+    }
+    
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (v instanceof JobGraphPrinter) {
+            JobGraphPrinter jpp = (JobGraphPrinter)v;
+            jpp.visit(this);
+        }
+    }
+
+    @Override
+    public boolean isEqual(Operator operator) {
+        if (!(operator instanceof JobStats)) return false;
+        return name.equalsIgnoreCase(operator.getName());
+    }    
+ 
+    @SuppressWarnings("deprecation")
+    void setId(JobID jobId) {
+        this.jobId = jobId;
+    }
+    
+    void setSuccessful(boolean isSuccessful) {
+        this.state = isSuccessful ? JobState.SUCCESS : JobState.FAILED;
+    }
+    
+    void setErrorMsg(String errorMsg) {
+        this.errorMsg = errorMsg;
+    }
+    
+    void setBackendException(Exception e) {
+        exception = e;
+    }
+        
+    @SuppressWarnings("unchecked")
+    void setConf(Configuration conf) {        
+        if (conf == null) return;
+        this.conf = conf;
+        try {
+            mapStores = (List<POStore>) ObjectSerializer.deserialize(conf
+                    .get(JobControlCompiler.PIG_MAP_STORES));
+            reduceStores = (List<POStore>) ObjectSerializer.deserialize(conf
+                    .get(JobControlCompiler.PIG_REDUCE_STORES));
+        } catch (IOException e) {
+            LOG.warn("Failed to deserialize the store list", e);
+        }                    
+    }
+    
+    void setMapStat(int size, long max, long min, long avg) {
+        numberMaps = size;
+        maxMapTime = max;
+        minMapTime = min;
+        avgMapTime = avg;       
+    }
+    
+    void setReduceStat(int size, long max, long min, long avg) {
+        numberReduces = size;
+        maxReduceTime = max;
+        minReduceTime = min;
+        avgReduceTime = avg;       
+    }  
+    
+    String getDisplayString() {
+        StringBuilder sb = new StringBuilder();
+        String id = (jobId == null) ? "N/A" : jobId.toString();
+        if (state == JobState.FAILED) {           
+            sb.append(id).append("\t")
+                .append(getAlias()).append("\t")
+                .append(getFeature()).append("\t")
+                .append("Message: ").append(getErrorMessage()).append("\t");
+        } else if (state == JobState.SUCCESS) {
+            sb.append(id).append("\t")
+                .append(numberMaps).append("\t")
+                .append(numberReduces).append("\t")
+                .append(maxMapTime/1000).append("\t")
+                .append(minMapTime/1000).append("\t")
+                .append(avgMapTime/1000).append("\t")
+                .append(maxReduceTime/1000).append("\t")
+                .append(minReduceTime/1000).append("\t")
+                .append(avgReduceTime/1000).append("\t")
+                .append(getAlias()).append("\t")
+                .append(getFeature()).append("\t");
+        }
+        for (OutputStats os : outputs) {
+            sb.append(os.getLocation()).append(",");
+        }        
+        sb.append("\n");
+        return sb.toString();
+    }
+
+    @SuppressWarnings("deprecation")
+    void addCounters(RunningJob rjob) {        
+        if (rjob != null) {
+            try {
+                counters = rjob.getCounters();
+            } catch (IOException e) {
+                LOG.warn("Unable to get job counters", e);
+            }
+        }
+        if (counters != null) {
+            Counters.Group taskgroup = counters
+                    .getGroup(PigStatsUtil.TASK_COUNTER_GROUP);
+            Counters.Group hdfsgroup = counters
+                    .getGroup(PigStatsUtil.FS_COUNTER_GROUP);
+            Counters.Group multistoregroup = counters
+                    .getGroup(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+            
+            mapInputRecords = taskgroup.getCounterForName(
+                    PigStatsUtil.MAP_INPUT_RECORDS).getCounter();
+            mapOutputRecords = taskgroup.getCounterForName(
+                    PigStatsUtil.MAP_OUTPUT_RECORDS).getCounter();
+            reduceInputRecords = taskgroup.getCounterForName(
+                    PigStatsUtil.REDUCE_INPUT_RECORDS).getCounter();
+            reduceOutputRecords = taskgroup.getCounterForName(
+                    PigStatsUtil.REDUCE_OUTPUT_RECORDS).getCounter();
+            hdfsBytesWritten = hdfsgroup.getCounterForName(
+                    PigStatsUtil.HDFS_BYTES_WRITTEN).getCounter();            
+            spillCount = counters.findCounter(
+                    PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT)
+                    .getCounter();
+            activeSpillCount = counters.findCounter(
+                    PigCounters.PROACTIVE_SPILL_COUNT).getCounter();
+            
+            Iterator<Counter> iter = multistoregroup.iterator();
+            while (iter.hasNext()) {
+                Counter cter = iter.next();
+                multiStoreCounters.put(cter.getName(), cter.getValue());
+            }            
+        }              
+    }
+    
+    void addMapReduceStatistics(JobClient client) {
+        TaskReport[] maps = null;
+        try {
+            maps = client.getMapTaskReports(jobId);
+        } catch (IOException e) {
+            LOG.warn("Failed to get map task report", e);            
+        }
+        if (maps != null && maps.length > 0) {
+            int size = maps.length;
+            long max = 0;
+            long min = Long.MAX_VALUE;
+            long total = 0;
+            for (TaskReport rpt : maps) {
+                long duration = rpt.getFinishTime() - rpt.getStartTime();
+                max = (duration > max) ? duration : max;
+                min = (duration < min) ? duration : min;
+                total += duration;
+            }
+            long avg = total / size;
+            setMapStat(size, max, min, avg);
+        }
+        TaskReport[] reduces = null;
+        try {
+            reduces = client.getReduceTaskReports(jobId);
+        } catch (IOException e) {
+            LOG.warn("Failed to get reduce task report", e);
+        }
+        if (reduces != null && reduces.length > 0) {
+            int size = reduces.length;
+            long max = 0;
+            long min = Long.MAX_VALUE;
+            long total = 0;
+            for (TaskReport rpt : reduces) {
+                long duration = rpt.getFinishTime() - rpt.getStartTime();
+                max = (duration > max) ? duration : max;
+                min = (duration < min) ? duration : min;
+                total += duration;
+            }
+            long avg = total / size;
+            setReduceStat(size, max, min, avg);
+        }       
+    }
+    
+    void setAlias(MapReduceOper mro) {       
+        annotate(ALIAS, ScriptState.get().getAlias(mro));             
+        annotate(FEATURE, ScriptState.get().getPigFeature(mro));
+    }
+    
+    void addOutputStatistics() {
+        if (mapStores == null || reduceStores == null) {
+            LOG.warn("unable to get stores of the job");
+            return;
+        }
+        
+        if (mapStores.size() + reduceStores.size() == 1) {
+            POStore sto = (mapStores.size() > 0) ? mapStores.get(0)
+                    : reduceStores.get(0);
+            if (!sto.isTmpStore()) {
+                long records = (mapStores.size() > 0) ? mapOutputRecords
+                        : reduceOutputRecords;           
+                OutputStats ds = new OutputStats(sto.getSFile().getFileName(),
+                        hdfsBytesWritten, records, (state == JobState.SUCCESS));
+                ds.setPOStore(sto);
+                ds.setConf(conf);
+                outputs.add(ds);
+            }
+        } else {
+            for (POStore sto : mapStores) {
+                if (sto.isTmpStore()) continue;
+                addOneOutputStats(sto);
+            }
+            for (POStore sto : reduceStores) {
+                if (sto.isTmpStore()) continue;
+                addOneOutputStats(sto);
+            }     
+        }
+    }
+    
+    private void addOneOutputStats(POStore sto) {
+        long records = -1;
+        if (sto.isMultiStore()) {
+            Long n = multiStoreCounters.get(PigStatsUtil.getMultiStoreCounterName(sto));
+            if (n != null) records = n;
+        } else {
+            records = mapOutputRecords;
+        }
+        String location = sto.getSFile().getFileName();
+        Path p = new Path(location);
+        URI uri = p.toUri();
+        long bytes = -1;
+        if (uri.getScheme() == null || uri.getScheme().equalsIgnoreCase("hdfs")) {
+            try {
+                FileSystem fs = p.getFileSystem(conf);
+                FileStatus[] lst = fs.listStatus(p);
+                if (lst != null) {
+                    for (FileStatus status : lst) {
+                        bytes += status.getLen();
+                    } 
+                }
+            } catch (IOException e) {
+                LOG.warn("unable to get byte written of the job", e);
+            }
+        }
+        OutputStats ds = new OutputStats(location, bytes, records,
+                (state == JobState.SUCCESS));  
+        ds.setPOStore(sto);
+        ds.setConf(conf);
+        outputs.add(ds);
+    }
+       
+}

Added: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java?rev=957277&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/OutputStats.java Wed Jun 23 17:29:33 2010
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * This class encapsulates the runtime statistics of an user specified output.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class OutputStats {
+
+    private String name;
+    private String location;
+    private long bytes;
+    private long records;
+
+    private boolean success;
+
+    private POStore store = null;
+    
+    private Configuration conf;
+
+    OutputStats(String location, long bytes, long records, boolean success) {
+        this.location = location;
+        this.bytes = bytes;
+        this.records = records;        
+        this.success = success;
+        try {
+            this.name = new Path(location).getName();
+        } catch (Exception e) {
+            // location is a mal formatted URL 
+            this.name = location;
+        }
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getLocation() {
+        return location;
+    }
+
+    public long getBytes() {
+        return bytes;
+    }
+
+    public long getNumberRecords() {
+        return records;
+    }
+
+    public String getFunctionName() {
+        return (store == null) ? null : store.getSFile().getFuncSpec()
+                .getClassName();
+    }
+
+    public boolean isSuccessful() {
+        return success;
+    }
+
+    public String getAlias() {
+        return (store == null) ? null : store.getAlias();
+    }
+
+    public POStore getPOStore() {
+        return store;
+    }
+
+    public Configuration getConf() {
+        return conf;
+    }
+    
+    String getDisplayString() {
+        StringBuilder sb = new StringBuilder();
+        if (success) {
+            sb.append("Successfully stored ").append(records).append(
+                    " records ");
+            if (bytes > 0) {
+                sb.append("(").append(bytes).append(" bytes) ");
+            }
+            sb.append("in: \"").append(location).append("\"\n");
+        } else {
+            sb.append("Failed to produce result in \"").append(location)
+                    .append("\"\n");
+        }
+        return sb.toString();
+    }
+
+    void setPOStore(POStore store) {
+        this.store = store;
+    }
+    
+    void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+}



Mime
View raw message