pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rd...@apache.org
Subject svn commit: r957277 [2/3] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLaye...
Date Wed, 23 Jun 2010 17:29:34 GMT
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Wed Jun 23 17:29:33 2010
@@ -15,345 +15,645 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.pig.tools.pigstats;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
 import java.io.IOException;
-import java.io.PrintStream;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
-import org.apache.hadoop.mapred.Counters;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigCounters;
-import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.PigException;
+import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.experimental.plan.BaseOperatorPlan;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.tools.pigstats.JobStats.JobState;
+
+/**
+ * PigStats encapsulates the statistics collected from a running script. 
+ * It includes status of the execution, the DAG of its MR jobs, as well as 
+ * information about outputs and inputs of the script. 
+ */
 
-public class PigStats {
-    MROperPlan mrp;
-    PhysicalPlan php;
-    JobControl jc;
-    JobClient jobClient;
-    Map<String, Map<String, String>> stats = new HashMap<String, Map<String,String>>();
-    // String lastJobID;
-    ArrayList<String> rootJobIDs = new ArrayList<String>();
-    ExecType mode;
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class PigStats {
     
-    private static final String localModeDataFile = "part-00000";
+    private static final Log LOG = LogFactory.getLog(PigStats.class);
     
-    public void setMROperatorPlan(MROperPlan mrp) {
-        this.mrp = mrp;
-    }
+    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";  
     
-    public void setJobControl(JobControl jc) {
-        this.jc = jc;
-    }
+    private static ThreadLocal<PigStats> tps = new ThreadLocal<PigStats>();
     
-    public void setJobClient(JobClient jobClient) {
-        this.jobClient = jobClient;
-    }
+    private PigContext pigContext;
     
-    public String getMRPlan() {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        mrp.dump(new PrintStream(baos));
-        return baos.toString();
-    }
+    private JobClient jobClient;
     
-    public void setExecType(ExecType mode) {
-        this.mode = mode;
-    }
+    private JobControlCompiler jcc;
     
-    public void setPhysicalPlan(PhysicalPlan php) {
-        this.php = php;
-    }
+    private JobGraph jobPlan;
     
-    public String getPhysicalPlan() {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        php.explain(baos);
-        return baos.toString();
-    }
+    // map MR job id to MapReduceOper
+    private Map<String, MapReduceOper> jobMroMap;
+     
+    private Map<MapReduceOper, JobStats> mroJobMap;
+      
+    private long startTime = -1;
+    private long endTime = -1;
     
-    public Map<String, Map<String, String>> accumulateStats() throws ExecException {
-        if(mode == ExecType.MAPREDUCE)
-            return accumulateMRStats();
-        else if(mode == ExecType.LOCAL)
-            return accumulateLocalStats();
-        else
-            throw new RuntimeException("Unrecognized mode. Either MapReduce or Local mode expected.");
-    }
+    private String userId;
     
-    public void addStatsGroup(String key, Map<String, String> value) {
-        stats.put(key, value);
+    private int returnCode = ReturnCode.UNKNOWN;
+    private String errorMessage;
+    private int errorCode = -1;
+    
+    public static PigStats get() {
+        if (tps.get() == null) tps.set(new PigStats());
+        return tps.get();
+    }
+        
+    static PigStats start() {
+        tps.set(new PigStats());
+        return tps.get();
     }
     
-    private Map<String, Map<String, String>> accumulateLocalStats() {
-        //The counter placed before a store in the local plan should be able to get the number of records
-        for(PhysicalOperator op : php.getLeaves()) {
-            Map<String, String> jobStats = new HashMap<String, String>();
-            stats.put(op.toString(), jobStats);         
-            String localFilePath=normalizeToLocalFilePath(((POStore)op).getSFile().getFileName());
-            File outputFile = new File( localFilePath + File.separator + localModeDataFile );
-            
-            long lineCounter = 0;
+    /**
+     * JobGraph is an {@link OperatorPlan} whose members are {@link JobStats}
+     */
+    public static class JobGraph extends BaseOperatorPlan {
+                
+        @Override
+        public String toString() {
+            JobGraphPrinter jp = new JobGraphPrinter(this);
             try {
-                BufferedReader in = new BufferedReader(new FileReader( outputFile ));
-                @SuppressWarnings("unused")
-                String tmpString = null;
-                while( (tmpString = in.readLine()) != null ) {
-                    lineCounter++;
+                jp.visit();
+            } catch (IOException e) {
+                LOG.warn("unable to print job plan", e);
+            }
+            return jp.toString();
+        }
+        
+        public Iterator<JobStats> iterator() {
+            return new Iterator<JobStats>() {
+                private Iterator<Operator> iter = getOperators();                
+                @Override
+                public boolean hasNext() {                
+                    return iter.hasNext();
+                }
+                @Override
+                public JobStats next() {              
+                    return (JobStats)iter.next();
+                }
+                @Override
+                public void remove() {}
+            };
+        }
+ 
+        boolean isConnected(Operator from, Operator to) {
+            List<Operator> succs = null;
+            try {
+                succs = getSuccessors(from);
+            } catch (IOException e) {
+                LOG.warn("unable to get successors for operator");
+            }
+            if (succs != null) {
+                for (Operator succ: succs) {
+                    if (succ.getName().equals(to.getName()) 
+                            || isConnected(succ, to)) {
+                        return true;
+                    }                    
+                }
+            }
+            return false;
+        }
+        
+        List<JobStats> getSuccessfulJobs() {
+            ArrayList<JobStats> lst = new ArrayList<JobStats>();
+            Iterator<JobStats> iter = iterator();
+            while (iter.hasNext()) {
+                JobStats js = iter.next();
+                if (js.getState() == JobState.SUCCESS) {
+                    lst.add(js);
+                }
+            }
+            Collections.sort(lst, new JobComparator());
+            return lst;
+        }
+        
+        List<JobStats> getFailedJobs() {
+            ArrayList<JobStats> lst = new ArrayList<JobStats>();
+            Iterator<JobStats> iter = iterator();
+            while (iter.hasNext()) {
+                JobStats js = iter.next();
+                if (js.getState() == JobState.FAILED) {
+                    lst.add(js);
                 }
-                in.close();
-            } catch (FileNotFoundException e) {
-            } catch (IOException e) {                
-            } finally {
-                jobStats.put("PIG_STATS_LOCAL_OUTPUT_RECORDS", (Long.valueOf(lineCounter)).toString());
             }            
-            jobStats.put("PIG_STATS_LOCAL_BYTES_WRITTEN", (Long.valueOf(outputFile.length())).toString());
+            return lst;
         }
-        return stats;
     }
     
-    private String normalizeToLocalFilePath(String fileName) {
-        if (fileName.startsWith("file:")){
-            return fileName.substring(5);
-        }
-        return fileName;
-    }
+    /**
+     * This class builds the job DAG from a MR plan
+     */
+    private class JobGraphBuilder extends MROpPlanVisitor {
 
-    private Map<String, Map<String, String>> accumulateMRStats() throws ExecException {
+        public JobGraphBuilder(MROperPlan plan) {
+            super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(
+                    plan));
+            jobPlan = new JobGraph();
+            mroJobMap = new HashMap<MapReduceOper, JobStats>();        
+        }
         
-        for(Job job : jc.getSuccessfulJobs()) {
-            
-            
-            JobConf jobConf = job.getJobConf();
-            
-            
-                RunningJob rj = null;
-                try {
-                    rj = jobClient.getJob(job.getAssignedJobID());
-                } catch (IOException e1) {
-                    String error = "Unable to get the job statistics from JobClient.";
-                    throw new ExecException(error, e1);
-                }
-                if(rj == null)
-                    continue;
-                
-                Map<String, String> jobStats = new HashMap<String, String>();
-                stats.put(job.getAssignedJobID().toString(), jobStats);
-                
-                try {
-                    PhysicalPlan plan = (PhysicalPlan) ObjectSerializer.deserialize(jobConf.get("pig.mapPlan"));
-                    jobStats.put("PIG_STATS_MAP_PLAN", plan.toString());
-                    plan = (PhysicalPlan) ObjectSerializer.deserialize(jobConf.get("pig.combinePlan"));
-                    if(plan != null) {
-                        jobStats.put("PIG_STATS_COMBINE_PLAN", plan.toString());
+        @Override
+        public void visitMROp(MapReduceOper mr) throws VisitorException {
+            JobStats js = new JobStats(
+                    mr.getOperatorKey().toString(), jobPlan);            
+            jobPlan.add(js);
+            List<MapReduceOper> preds = getPlan().getPredecessors(mr);
+            if (preds != null) {
+                for (MapReduceOper pred : preds) {
+                    JobStats jpred = mroJobMap.get(pred);
+                    if (!jobPlan.isConnected(jpred, js)) {
+                        jobPlan.connect(jpred, js);
                     }
-                    plan = (PhysicalPlan) ObjectSerializer.deserialize(jobConf.get("pig.reducePlan"));
-                    if(plan != null) {
-                        jobStats.put("PIG_STATS_REDUCE_PLAN", plan.toString());
-                    }
-                } catch (IOException e2) {
-                    String error = "Error deserializing plans from the JobConf.";
-                    throw new RuntimeException(error, e2);
                 }
-                
-                Counters counters = null;
-                try {
-                    counters = rj.getCounters();
-                    // This code checks if the counters is null, if it is, then all the stats are unknown.
-                    // We use -1 to indicate unknown counter. In fact, Counters should not be null, it is
-                    // a hadoop bug, once this bug is fixed in hadoop, the null handling code should never be hit.
-                    // See Pig-943
-                    if (counters!=null)
-                    {
-                        Counters.Group taskgroup = counters.getGroup("org.apache.hadoop.mapred.Task$Counter");
-                        Counters.Group hdfsgroup = counters.getGroup("FileSystemCounters");
-                        jobStats.put("PIG_STATS_MAP_INPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("MAP_INPUT_RECORDS").getCounter())).toString());
-                        jobStats.put("PIG_STATS_MAP_OUTPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("MAP_OUTPUT_RECORDS").getCounter())).toString());
-                        jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("REDUCE_INPUT_RECORDS").getCounter())).toString());
-                        jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("REDUCE_OUTPUT_RECORDS").getCounter())).toString());
-                        jobStats.put("PIG_STATS_BYTES_WRITTEN", (Long.valueOf(hdfsgroup.getCounterForName("HDFS_BYTES_WRITTEN").getCounter())).toString());
-                        jobStats.put("PIG_STATS_SMM_SPILL_COUNT", (Long.valueOf(counters.findCounter(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT).getCounter())).toString() );
-                        jobStats.put("PIG_STATS_PROACTIVE_SPILL_COUNT", (Long.valueOf(counters.findCounter(PigCounters.PROACTIVE_SPILL_COUNT).getCounter())).toString() );
+            }
+            mroJobMap.put(mr, js);            
+        }        
+    }
+    
+    /**
+     * This class prints a JobGraph
+     */
+    static class JobGraphPrinter extends PlanVisitor {
+        
+        StringBuffer buf;
 
-                    }
-                    else
-                    {
-                        jobStats.put("PIG_STATS_MAP_INPUT_RECORDS", "-1");
-                        jobStats.put("PIG_STATS_MAP_OUTPUT_RECORDS", "-1");
-                        jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", "-1");
-                        jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", "-1");
-                        jobStats.put("PIG_STATS_BYTES_WRITTEN", "-1");
-                        jobStats.put("PIG_STATS_SMM_SPILL_COUNT", "-1");
-                        jobStats.put("PIG_STATS_PROACTIVE_SPILL_COUNT", "-1");
-                    }
-                    
-                } catch (IOException e) {
-                    // TODO Auto-generated catch block
-                    String error = "Unable to get the counters.";
-                    throw new ExecException(error, e);
-                }
+        protected JobGraphPrinter(OperatorPlan plan) {
+            super(plan,
+                    new org.apache.pig.experimental.plan.DependencyOrderWalker(
+                            plan));
+            buf = new StringBuffer();
         }
         
-        getLastJobIDs(jc.getSuccessfulJobs());
+        public void visit(JobStats op) throws IOException {
+            buf.append(op.getJobId());
+            List<Operator> succs = plan.getSuccessors(op);
+            if (succs != null) {
+                buf.append("\t->\t");
+                for (Operator p : succs) {                  
+                    buf.append(((JobStats)p).getJobId()).append(",");
+                }               
+            }
+            buf.append("\n");
+        }
         
-        return stats;
+        @Override
+        public String toString() {
+            buf.append("\n");
+            return buf.toString();
+        }        
+    }
+    
+    private static class JobComparator implements Comparator<JobStats> {
+        @Override
+        public int compare(JobStats o1, JobStats o2) {           
+            return o1.getJobId().compareTo(o2.getJobId());
+        }       
     }
     
+    public boolean isSuccessful() {
+        return (returnCode == ReturnCode.SUCCESS);
+    }
+    
+    /**
+     * Return codes are defined in {@link ReturnCode}
+     */
+    public int getReturnCode() {
+        return returnCode;
+    }
+    
+    public String getErrorMessage() {
+        return errorMessage;
+    }
 
-    private void getLastJobIDs(List<Job> jobs) {
-        rootJobIDs.clear();
-         Set<Job> temp = new HashSet<Job>();
-         for(Job job : jobs) {
-             if(job.getDependingJobs() != null && job.getDependingJobs().size() > 0)
-                 temp.addAll(job.getDependingJobs());
-         }
-         
-         //difference between temp and jobs would be the set of leaves
-         //we can safely assume there would be only one leaf
-         for(Job job : jobs) {
-             if(temp.contains(job)) continue;
-             else rootJobIDs.add(job.getAssignedJobID().toString());
-         }
-    }
-    
-    public List<String> getRootJobIDs() {
-        return rootJobIDs;
-    }
-    
-    public Map<String, Map<String, String>> getPigStats() {
-        return stats;
-    }
-    
-    public long getRecordsWritten() {
-        if(mode == ExecType.LOCAL)
-            return getRecordsCountLocal();
-        else if(mode == ExecType.MAPREDUCE)
-            return getRecordsCountMR();
-        else
-            throw new RuntimeException("Unrecognized mode. Either MapReduce or Local mode expected.");
-    }
-    
-    private long getRecordsCountLocal() {
-        //System.out.println(getPhysicalPlan());
-        //because of the nature of the parser, there will always be only one store
-
-        for(PhysicalOperator op : php.getLeaves()) {
-            return Long.parseLong(stats.get(op.toString()).get("PIG_STATS_LOCAL_OUTPUT_RECORDS"));
+    /**
+     * Returns the error code of {@link PigException}
+     */
+    public int getErrorCode() {
+        return errorCode;
+    }
+    
+    /**
+     * Returns the DAG of the MR jobs spawned by the script
+     */
+    public JobGraph getJobGraph() {
+        return jobPlan;
+    }
+    
+    /**
+     * Returns the list of output locations in the script
+     */
+    public List<String> getOutputLocations() {
+        ArrayList<String> locations = new ArrayList<String>();
+        for (OutputStats output : getOutputStats()) {
+            locations.add(output.getLocation());
         }
-        return 0;
+        return Collections.unmodifiableList(locations);
     }
     
     /**
-     * Returns the no. of records written by the pig script in MR mode
-     * @return
-     */
-    private long getRecordsCountMR() {
-        long records = 0;
-        for (String jid : rootJobIDs) {
-            Map<String, String> jobStats = stats.get(jid);
-            if (jobStats == null) continue;
-            String reducePlan = jobStats.get("PIG_STATS_REDUCE_PLAN");
-        	if(reducePlan == null) {
-        	    if (Long.parseLong(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"))==-1L)
-                {
-        	        records = -1;
-                    break;
-                }
-        	    else
-        	        records += Long.parseLong(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
-        	} else {
-        	    if (Long.parseLong(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"))==-1L)
-                {
-                    records = -1;
-                    break;
-                }
-                else
-                    records += Long.parseLong(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
-        	}
+     * Returns the list of output names in the script
+     */
+    public List<String> getOutputNames() {
+        ArrayList<String> names = new ArrayList<String>();
+        for (OutputStats output : getOutputStats()) {            
+            names.add(output.getName());
         }
-    	return records;
+        return Collections.unmodifiableList(names);
     }
     
-    public long getBytesWritten() {
-        if(mode == ExecType.LOCAL) {           
-            return getLocalBytesWritten(); 
-    	} else if( mode == ExecType.MAPREDUCE ) {
-    	    return getMapReduceBytesWritten();
-    	} else {
-    		throw new RuntimeException("Unrecognized mode. Either MapReduce or Local mode expected.");
-    	}
-    	
+    /**
+     * Returns the number of bytes for the given output location,
+     * -1 for invalid location or name.
+     */
+    public long getNumberBytes(String location) {
+        if (location == null) return -1;
+        String name = new Path(location).getName();
+        long count = -1;
+        for (OutputStats output : getOutputStats()) {
+            if (name.equals(output.getName())) {
+                count = output.getBytes();
+                break;
+            }
+        }
+        return count;
     }
     
-    public long getSMMSpillCount() {
-        long spillCount = 0;
-        for (String jid : rootJobIDs) {
-            Map<String, String> jobStats = stats.get(jid);
-            if (jobStats == null) continue;
-            if (Long.parseLong(jobStats.get("PIG_STATS_SMM_SPILL_COUNT"))==-1L)
-            {
-                spillCount = -1L;
+    /**
+     * Returns the number of records for the given output location,
+     * -1 for invalid location or name.
+     */
+    public long getNumberRecords(String location) {
+        if (location == null) return -1;
+        String name = new Path(location).getName();
+        long count = -1;
+        for (OutputStats output : getOutputStats()) {
+            if (name.equals(output.getName())) {
+                count = output.getNumberRecords();
                 break;
             }
-            spillCount += Long.parseLong(jobStats.get("PIG_STATS_SMM_SPILL_COUNT"));
         }
-        return spillCount;
+        return count;
+    }
+        
+    /**
+     * Returns the alias associated with this output location
+     */
+    public String getOutputAlias(String location) {
+        if (location == null) return null;
+        String name = new Path(location).getName();
+        String alias = null;
+        for (OutputStats output : getOutputStats()) {
+            if (name.equals(output.getName())) {
+                alias = output.getAlias();
+                break;
+            }
+        }
+        return alias;
+    }
+    
+    /**
+     * Returns the total spill counts from {@link SpillableMemoryManager}.
+     */
+    public long getSMMSpillCount() {
+        Iterator<JobStats> it = jobPlan.iterator();
+        long ret = 0;
+        while (it.hasNext()) {
+            ret += it.next().getSMMSpillCount();
+        }
+        return ret;
     }
     
+    /**
+     * Returns the total spill counts from {@link InternalCachedBag}.
+     */
     public long getProactiveSpillCount() {
-        long spillCount = 0;
-        for (String jid : rootJobIDs) {
-            Map<String, String> jobStats = stats.get(jid);
-            if (jobStats == null) continue;
-            if (Long.parseLong(jobStats.get("PIG_STATS_PROACTIVE_SPILL_COUNT"))==-1L)
-            {
-                spillCount = -1L;
-                break;
+        Iterator<JobStats> it = jobPlan.iterator();
+        long ret = 0;
+        while (it.hasNext()) {            
+            ret += it.next().getProactiveSpillCount();
+        }
+        return ret;
+    }
+    
+    /**
+     * Returns the total bytes written to user specified HDFS
+     * locations of this script.
+     */
+    public long getBytesWritten() {
+        Iterator<JobStats> it = jobPlan.iterator();
+        long ret = 0;
+        while (it.hasNext()) {
+            long n = it.next().getBytesWritten();
+            if (n > 0) ret += n;
+        }
+        return ret;
+    }
+    
+    /**
+     * Returns the total number of records in user specified output
+     * locations of this script.
+     */
+    public long getRecordWritten() {
+        Iterator<JobStats> it = jobPlan.iterator();
+        long ret = 0;
+        while (it.hasNext()) {
+            long n = it.next().getRecordWrittern();
+            if (n > 0) ret += n;
+        }
+        return ret;
+    }
+
+    public String getHadoopVersion() {
+        return ScriptState.get().getHadoopVersion();
+    }
+    
+    public String getPigVersion() {
+        return ScriptState.get().getPigVersion();
+    }
+   
+    public String getScriptId() {
+        return ScriptState.get().getId();
+    }
+    
+    public String getFeatures() {
+        return ScriptState.get().getScriptFeatures();
+    }
+    
+    public long getDuration() {
+        return (startTime > 0 && endTime > 0) ? (endTime - startTime) : -1;
+    }
+    
+    /**
+     * Returns the number of MR jobs for this script
+     */
+    public int getNumberJobs() {
+        return jobPlan.size();
+    }
+        
+    public List<OutputStats> getOutputStats() {
+        List<OutputStats> outputs = new ArrayList<OutputStats>();
+        Iterator<JobStats> iter = jobPlan.iterator();
+        while (iter.hasNext()) {
+            for (OutputStats os : iter.next().getOutputs()) {
+                outputs.add(os);
             }
-            spillCount += Long.parseLong(jobStats.get("PIG_STATS_PROACTIVE_SPILL_COUNT"));
+        }        
+        return Collections.unmodifiableList(outputs);       
+    }
+    
+    private PigStats() {        
+        jobMroMap = new HashMap<String, MapReduceOper>(); 
+        jobPlan = new JobGraph();
+    }
+    
+    void start(PigContext pigContext, JobClient jobClient, 
+            JobControlCompiler jcc, MROperPlan mrPlan) {
+        
+        if (pigContext == null || jobClient == null || jcc == null) {
+            LOG.warn("invalid params: " + pigContext + jobClient + jcc);
+            return;
         }
-        return spillCount;
+        
+        this.pigContext = pigContext;
+        this.jobClient = jobClient;
+        this.jcc = jcc;         
+        
+        // build job DAG with job ids assigned to null 
+        try {
+            new JobGraphBuilder(mrPlan).visit();
+        } catch (VisitorException e) {
+            LOG.warn("unable to build job plan", e);
+        }
+        
+        startTime = System.currentTimeMillis();
+        userId = System.getProperty("user.name");
+    }
+    
+    void stop() {
+        endTime = System.currentTimeMillis();
+        int m = getNumberSuccessfulJobs();
+        int n = getNumberFailedJobs();
+ 
+        if (n == 0 && m > 0 && m == jobPlan.size()) {
+            returnCode = ReturnCode.SUCCESS;
+        } else if (m > 0 && m < jobPlan.size()) {
+            returnCode = ReturnCode.PARTIAL_FAILURE;
+        } else {
+            returnCode = ReturnCode.FAILURE;
+        }
+    }
+    
+    boolean isInitialized() {
+        return startTime > 0;
+    }
+    
+    JobClient getJobClient() {
+        return jobClient;
     }
     
-    private long getLocalBytesWritten() {
-    	for(PhysicalOperator op : php.getLeaves())
-    		return Long.parseLong(stats.get(op.toString()).get("PIG_STATS_LOCAL_BYTES_WRITTEN"));
-    	return 0;
+    JobControlCompiler getJobControlCompiler() {
+        return jcc;
+    }
+    
+    void setReturnCode(int returnCode) {
+        this.returnCode = returnCode; 
+    }
+        
+    @SuppressWarnings("deprecation")
+    JobStats addJobStats(Job job) {
+        MapReduceOper mro = null;
+        JobID jobId = job.getAssignedJobID();
+        if (jobId != null) {
+            mro = jobMroMap.get(jobId.toString());
+        } else {
+            mro = jobMroMap.get(job.toString());
+        }
+        if (mro == null) {
+            LOG.warn("unable to get MR oper for job: "
+                    + ((jobId == null) ? job.toString() : jobId.toString()));
+            return null;
+        }
+        JobStats js = mroJobMap.get(mro);
+        if (js == null) {
+            LOG.warn("unable to get Job stats for job: " 
+                    + ((jobId == null) ? job.toString() : jobId.toString()));
+            return null;
+        }
+        js.setAlias(mro);
+        js.setConf(job.getJobConf());
+        return js;
+    }
+            
+    void display() {
+        if (returnCode == ReturnCode.UNKNOWN) {
+            LOG.warn("unknown return code, can't display the results");
+            return;
+        }
+        SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
+        StringBuilder sb = new StringBuilder();
+        sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
+        sb.append(getHadoopVersion()).append("\t").append(getPigVersion()).append("\t")
+            .append(userId).append("\t")
+            .append(sdf.format(new Date(startTime))).append("\t")
+            .append(sdf.format(new Date(endTime))).append("\t")
+            .append(getFeatures()).append("\n");
+        sb.append("\n");
+        if (returnCode == ReturnCode.SUCCESS) {
+            sb.append("Success!\n");
+        } else if (returnCode == ReturnCode.PARTIAL_FAILURE) {
+            sb.append("Some jobs have failed! Stop running all dependent jobs\n");
+        } else {
+            sb.append("Failed!\n");
+        }
+        sb.append("\n");
+        if (returnCode == ReturnCode.SUCCESS 
+                || returnCode == ReturnCode.PARTIAL_FAILURE) {
+            sb.append("Job Stats (time in seconds):\n");
+            sb.append("JobId\tMaps\tReduces\tMaxMapTime\tMinMapTIme\t" +
+                    "AvgMapTime\tMaxReduceTime\tMinReduceTime\tAvgReduceTime\t" +
+                    "Alias\tFeature\tOutputs\n");
+            List<JobStats> arr = jobPlan.getSuccessfulJobs();
+            for (JobStats js : arr) {                
+                sb.append(js.getDisplayString());
+            }
+            sb.append("\n");
+        }
+        if (returnCode == ReturnCode.FAILURE
+                || returnCode == ReturnCode.PARTIAL_FAILURE) {
+            sb.append("Failed Jobs:\n");
+            sb.append("JobId\tAlias\tFeature\tMessage\tOutputs\n");
+            List<JobStats> arr = jobPlan.getFailedJobs();
+            for (JobStats js : arr) {   
+                sb.append(js.getDisplayString());
+            }
+            sb.append("\n");
+        }
+        sb.append("Outputs:\n");
+        for (OutputStats ds : getOutputStats()) {
+            sb.append(ds.getDisplayString());
+        }
+        
+        sb.append("\nCounters:\n");
+        sb.append("Total records written : " + getRecordWritten()).append("\n");
+        sb.append("Total bytes written : " + getBytesWritten()).append("\n");
+        sb.append("Spillable Memory Manager spill count : "
+                + getSMMSpillCount()).append("\n");
+        sb.append("Proactive spill count : " 
+                + getProactiveSpillCount()).append("\n");
+        
+        sb.append("\nJob DAG:\n").append(jobPlan.toString());
+        
+        LOG.info("Script Statistics: \n" + sb.toString());
     }
     
-    private long getMapReduceBytesWritten() {
-        long bytesWritten = 0;
-        for (String jid : rootJobIDs) {
-            Map<String, String> jobStats = stats.get(jid);
-            if (jobStats == null) continue;
-            if (Long.parseLong(jobStats.get("PIG_STATS_BYTES_WRITTEN"))==-1L)
-            {
-                bytesWritten = -1L;
+    @SuppressWarnings("deprecation")
+    void mapMROperToJob(MapReduceOper mro, Job job) {
+        if (mro == null) {
+            LOG.warn("null MR operator");
+        } else {
+            JobStats js = mroJobMap.get(mro);
+            if (js == null) {
+                LOG.warn("null job stats for mro: " + mro.getOperatorKey());
+            } else {
+                JobID id = job.getAssignedJobID();
+                js.setId(id);    
+                if (id != null) {
+                    jobMroMap.put(id.toString(), mro);
+                } else {
+                    jobMroMap.put(job.toString(), mro);
+                }
+            }
+        }
+    }
+
+    void setErrorMessage(String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+
+    void setErrorCode(int errorCode) {
+        this.errorCode = errorCode;
+    }    
+    
+    void setBackendException(Job job, Exception e) {
+        if (e instanceof PigException) {
+            LOG.error("ERROR " + ((PigException)e).getErrorCode() + ": " 
+                    + e.getLocalizedMessage());
+        } else if (e != null) {
+            LOG.error("ERROR: " + e.getLocalizedMessage());
+        }
+        
+        if (job.getAssignedJobID() == null || e == null) {
+            LOG.debug("unable to set backend exception");
+            return;
+        }
+        String id = job.getAssignedJobID().toString();
+        Iterator<JobStats> iter = jobPlan.iterator();
+        while (iter.hasNext()) {
+            JobStats js = iter.next();
+            if (id.equals(js.getJobId())) {
+                js.setBackendException(e);
                 break;
             }
-            bytesWritten += Long.parseLong(jobStats.get("PIG_STATS_BYTES_WRITTEN"));
         }
-        return bytesWritten;
+    }
+    
+    PigContext getPigContext() {
+        return pigContext;
+    }
+    
+    int getNumberSuccessfulJobs() {
+        Iterator<JobStats> iter = jobPlan.iterator();
+        int count = 0;
+        while (iter.hasNext()) {
+            if (iter.next().getState() == JobState.SUCCESS) count++; 
+        }
+        return count;
+    }
+    
+    int getNumberFailedJobs() {
+        Iterator<JobStats> iter = jobPlan.iterator();
+        int count = 0;
+        while (iter.hasNext()) {
+            if (iter.next().getState() == JobState.FAILED) count++; 
+        }
+        return count;
     }
     
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Wed Jun 23 17:29:33 2010
@@ -19,26 +19,64 @@
 package org.apache.pig.tools.pigstats;
 
 import java.io.IOException;
+import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.mortbay.log.Log;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 
+/**
+ * A utility class for Pig Statistics
+ */
 public abstract class PigStatsUtil {
 
     public static final String MULTI_STORE_RECORD_COUNTER 
             = "Output records in ";
     public static final String MULTI_STORE_COUNTER_GROUP 
             = "MultiStoreCounters";
+    public static final String TASK_COUNTER_GROUP 
+            = "org.apache.hadoop.mapred.Task$Counter";
+    public static final String FS_COUNTER_GROUP 
+            = "FileSystemCounters";
+    public static final String MAP_INPUT_RECORDS 
+            = "MAP_INPUT_RECORDS";
+    public static final String MAP_OUTPUT_RECORDS 
+            = "MAP_OUTPUT_RECORDS";
+    public static final String REDUCE_INPUT_RECORDS 
+            = "REDUCE_INPUT_RECORDS";
+    public static final String REDUCE_OUTPUT_RECORDS 
+            = "REDUCE_OUTPUT_RECORDS";
+    public static final String HDFS_BYTES_WRITTEN 
+            = "HDFS_BYTES_WRITTEN";
     
+    private static final Log LOG = LogFactory.getLog(PigStatsUtil.class);
+   
+    /**
+     * Returns the count for the given counter name in the counter group 
+     * 'MultiStoreCounters'
+     * 
+     * @param job the MR job
+     * @param jobClient the Hadoop job client
+     * @param counterName the counter name
+     * @return the count of the given counter name
+     */
     @SuppressWarnings("deprecation")
     public static long getMultiStoreCount(Job job, JobClient jobClient,
             String counterName) {
-        long value = 0;
+        long value = -1;
         try {
             RunningJob rj = jobClient.getJob(job.getAssignedJobID());
             if (rj != null) {
@@ -47,13 +85,170 @@ public abstract class PigStatsUtil {
                 value = counter.getValue();
             }
         } catch (IOException e) {
-            Log.warn("Failed to get the counter for " + counterName);
+            LOG.warn("Failed to get the counter for " + counterName, e);
         }
         return value;        
     }
     
+    /**
+     * Returns the counter name for the given {@link POStore}
+     * 
+     * @param store the POStore
+     * @return the counter name 
+     */
     public static String getMultiStoreCounterName(POStore store) {
         return MULTI_STORE_RECORD_COUNTER +
                 new Path(store.getSFile().getFileName()).getName();
     }
+           
+    /**
+     * Starts collecting statistics for the given MR plan
+     * 
+     * @param pc the Pig context
+     * @param client the Hadoop job client
+     * @param jcc the job compiler
+     * @param plan the MR plan
+     */
+    public static void startCollection(PigContext pc, JobClient client, 
+            JobControlCompiler jcc, MROperPlan plan) {
+        PigStats ps = PigStats.start();
+        ps.start(pc, client, jcc, plan);
+    }
+     
+    /**
+     * Stops collecting statistics for a MR plan
+     * 
+     * @param display if true, log collected statistics in the Pig log 
+     *      file at INFO level 
+     */
+    public static void stopCollection(boolean display) {
+        PigStats ps = PigStats.get();
+        ps.stop();
+        if (!ps.isSuccessful()) {
+            LOG.error(ps.getNumberFailedJobs() + " map reduce job(s) failed!");
+            String errMsg = ps.getErrorMessage();
+            if (errMsg != null) {
+                LOG.error("Error message: " + errMsg);
+            }            
+        }
+        if (display) ps.display();
+    }
+    
+    /**
+     * Returns an empty PigStats object
+     * 
+     * @return an empty PigStats object
+     */
+    public static PigStats getEmptyPigStats() {
+        return PigStats.start();
+    }
+    
+    /**
+     * Returns the PigStats with the given return code
+     * 
+     * @param code the return code
+     * @return the PigStats with the given return code
+     */
+    public static PigStats getPigStats(int code) {
+        PigStats ps = PigStats.get();
+        ps.setReturnCode(code);
+        return ps;
+    }
+    
+    /**
+     * Logs the statistics in the Pig log file at INFO level
+     */
+    public static void displayStatistics() {
+        PigStats.get().display();
+    }
+    
+    /**
+     * Updates the {@link JobGraph} of the {@link PigStats}. The initial 
+     * {@link JobGraph} is created without job ids using {@link MROperPlan}, 
+     * before any job is submitted for execution. The {@link JobGraph} then
+     * is updated with job ids after jobs are executed. 
+     *  
+     * @param jobMroMap the map that maps {@link Job}s to {@link MapReduceOper}s
+     */
+    public static void updateJobMroMap(Map<Job, MapReduceOper> jobMroMap) {
+        PigStats ps = PigStats.get();
+        for (Map.Entry<Job, MapReduceOper> entry : jobMroMap.entrySet()) {
+            MapReduceOper mro = entry.getValue();
+            ps.mapMROperToJob(mro, entry.getKey());
+        }        
+    }
+    
+    /**
+     * Updates the statistics after a patch of jobs is done
+     * 
+     * @param jc the job control
+     */
+    public static void accumulateStats(JobControl jc) {
+        PigStats ps = PigStats.get();
+  
+        for (Job job : jc.getSuccessfulJobs()) {            
+            accumulateSuccessStatistics(ps, job);
+        }
+        
+        for (Job job : jc.getFailedJobs()) {                      
+            JobStats js = 
+                addFailedJobStats(ps, job);
+            if (js != null) {
+                js.setErrorMsg(job.getMessage());                    
+            } else {
+                LOG.warn("unable to add failed job stats: " + job);
+            }
+        }
+    }
+    
+    public static void setErrorMessage(String msg) {
+        PigStats.get().setErrorMessage(msg);
+    }
+    
+    public static void setErrorCode(int code) {
+        PigStats.get().setErrorCode(code);
+    }
+    
+    public static void setBackendException(Job job, Exception e) {
+        PigStats.get().setBackendException(job, e);
+    }
+    
+    private static JobStats addFailedJobStats(PigStats ps, Job job) {
+        JobStats js = ps.addJobStats(job);
+        if (js == null) {
+            LOG.warn("unable to add failed job stats");            
+        } else {       
+            js.setSuccessful(false);
+            js.addOutputStatistics();
+        }
+        return js;
+    }
+    
+    private static void accumulateSuccessStatistics(PigStats ps, Job job) {
+        JobStats js = ps.addJobStats(job);
+        if (js == null) {
+            LOG.warn("unable to add job stats");
+        } else {                
+            js.setSuccessful(true);
+                           
+            js.addMapReduceStatistics(ps.getJobClient());
+            
+            JobClient client = ps.getJobClient();
+            RunningJob rjob = null;
+            try {
+                rjob = client.getJob(job.getAssignedJobID());
+            } catch (IOException e) {
+                LOG.warn("Failed to get running job", e);
+            }
+            if (rjob == null) {
+                LOG.warn("Failed to get RunningJob for job " 
+                        + job.getAssignedJobID());           
+            } else {                        
+                js.addCounters(rjob); 
+            }
+            
+            js.addOutputStatistics();
+        }
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Wed Jun 23 17:29:33 2010
@@ -22,9 +22,12 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +36,7 @@ import java.util.jar.Attributes;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
 
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,17 +47,44 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOCross;
+import org.apache.pig.impl.logicalLayer.LODistinct;
+import org.apache.pig.impl.logicalLayer.LOFilter;
+import org.apache.pig.impl.logicalLayer.LOForEach;
+import org.apache.pig.impl.logicalLayer.LOJoin;
+import org.apache.pig.impl.logicalLayer.LOLimit;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOStream;
+import org.apache.pig.impl.logicalLayer.LOUnion;
+import org.apache.pig.impl.logicalLayer.LOVisitor;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LOCogroup.GROUPTYPE;
+import org.apache.pig.impl.logicalLayer.LOJoin.JOINTYPE;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 
 /**
  * ScriptStates encapsulates settings for a Pig script that runs on a hadoop 
@@ -69,15 +100,17 @@ public class ScriptState {
      */
     private enum PIG_PROPERTY {
         SCRIPT_ID           ("pig.script.id"),
-        SCRIPT              ("pig.script"),
-        LAUNCHER_HOST       ("pig.launcher.host"),
+        SCRIPT              ("pig.script"),       
         COMMAND_LINE        ("pig.command.line"),
         HADOOP_VERSION      ("pig.hadoop.version"),
         VERSION             ("pig.version"),
         INPUT_DIRS          ("pig.input.dirs"),
         MAP_OUTPUT_DIRS     ("pig.map.output.dirs"),
         REDUCE_OUTPUT_DIRS  ("pig.reduce.output.dirs"),
-        FEATURE             ("pig.feature");
+        JOB_PARENTS         ("pig.parent.jobid"),
+        JOB_FEATURE         ("pig.job.feature"),
+        SCRIPT_FEATURES     ("pig.script.features"),
+        JOB_ALIAS           ("pig.alias");        
        
         private String displayStr;
         
@@ -92,16 +125,27 @@ public class ScriptState {
     /**
      * Features used in a Pig script
      */
-    private enum PIG_FEATURE {
+    static enum PIG_FEATURE {
+        UNKNOWN,
         MERGE_JION,
         REPLICATED_JOIN,
-        SKEWED_JION,
+        SKEWED_JOIN,
+        HASH_JOIN,
         COLLECTED_GROUP,
         MERGE_COGROUP,
+        COGROUP,
+        GROUP_BY,
         ORDER_BY,
         DISTINCT,
         STREAMING,
-        MAP_ONLY;
+        SAMPLING,
+        MULTI_QUERY,
+        FILTER,
+        MAP_ONLY,
+        CROSS,
+        LIMIT,
+        UNION,
+        COMBINER;
     };
     
     /**
@@ -110,6 +154,11 @@ public class ScriptState {
      */
     public static final String INSERT_ENABLED = "pig.script.info.enabled";
     
+    /**
+     * Restricts the size of Pig script stored in job xml 
+     */
+    public static final int MAX_SCRIPT_SIZE = 1024; 
+    
     private static final Log LOG = LogFactory.getLog(ScriptState.class);
 
     private static ThreadLocal<ScriptState> tss = new ThreadLocal<ScriptState>();
@@ -118,12 +167,15 @@ public class ScriptState {
     
     private String script;
     private String commandLine;
-    private String feature;
     
-    private String host;
     private String pigVersion;
     private String hodoopVersion;
-           
+    
+    private long scriptFeatures;
+    
+    private Map<MapReduceOper, String> featureMap = null;
+    private Map<MapReduceOper, String> aliasMap = null;
+    
     public static ScriptState start(String commandLine) {
         ScriptState ss = new ScriptState(UUID.randomUUID().toString());
         ss.setCommandLine(commandLine);
@@ -141,15 +193,14 @@ public class ScriptState {
             ScriptState.start("");
         }
         return tss.get();
-    }
-           
+    }           
+       
     public void addSettingsToConf(MapReduceOper mro, Configuration conf) {
-        LOG.info("Pig script settings is added to the job");
+        LOG.info("Pig script settings are added to the job");
         conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion());
         conf.set(PIG_PROPERTY.VERSION.toString(), getPigVersion());
         conf.set(PIG_PROPERTY.SCRIPT_ID.toString(), id);
-        conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript());
-        conf.set(PIG_PROPERTY.LAUNCHER_HOST.toString(), getHostName());
+        conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript());        
         conf.set(PIG_PROPERTY.COMMAND_LINE.toString(), getCommandLine());
         
         try {
@@ -188,6 +239,8 @@ public class ScriptState {
         }
 
         setPigFeature(mro, conf);
+        
+        setJobParents(mro, conf);
     }
  
     public void setScript(File file) {            
@@ -198,22 +251,40 @@ public class ScriptState {
         }
     }
 
-    public void setScript(String script) {            
-        this.script = script;
-    }
-
-    private String getScript() {
-        return (script == null) ? "" : script;
+    public void setScript(String script) {        
+        if (script == null) return;
+        
+        // restrict the size of the script to be stored in job conf
+        script = (script.length() > MAX_SCRIPT_SIZE) ? script.substring(0,
+                MAX_SCRIPT_SIZE) : script;
+        
+        // XML parser cann't handle certain characters, including
+        // the control character (&#1). Use Base64 encoding to
+        // get around this problem        
+        this.script = new String(Base64.encodeBase64(script.getBytes()));
+    }
+   
+    public void setScriptFeatures(LogicalPlan plan) {
+        BitSet bs = new BitSet();
+        try {
+            new LogicalPlanFeatureVisitor(plan, bs).visit();
+        } catch (VisitorException e) {
+            LOG.warn("unable to get script feature", e);
+        }
+        scriptFeatures = bitSetToLong(bs);        
+        
+        LOG.info("Pig features used in the script: "
+                + featureLongToString(scriptFeatures));
     }
     
-    private String getHadoopVersion() {
+    public String getHadoopVersion() {
         if (hodoopVersion == null) {
             hodoopVersion = VersionInfo.getVersion();
         }
         return (hodoopVersion == null) ? "" : hodoopVersion;
     }
     
-    private String getPigVersion() {
+    public String getPigVersion() {
         if (pigVersion == null) {
             String findContainingJar = JarManager.findContainingJar(ScriptState.class);
             try { 
@@ -228,19 +299,9 @@ public class ScriptState {
         }
         return (pigVersion == null) ? "" : pigVersion;
     }
-    
-    private String getHostName() { 
-        if (host == null) {
-            try {
-                InetAddress addr = InetAddress.getLocalHost();
-                host = addr.getHostName(); 
-            } catch (UnknownHostException e) {
-                LOG.warn("unable to get host name", e); 
-            }         
-        }
-        return (host == null) ? "" : host;
-    }
-    
+        
+    String getId() { return id; }
+        
     private String getCommandLine() {
         return (commandLine == null) ? "" : commandLine;
     }
@@ -249,6 +310,10 @@ public class ScriptState {
         this.commandLine = commandLine;
     }
     
+    private String getScript() {
+        return (script == null) ? "" : script;
+    }
+    
     private void setScript(BufferedReader reader) {
         StringBuilder sb = new StringBuilder();
         try {
@@ -263,68 +328,381 @@ public class ScriptState {
         } catch (IOException e) {
             LOG.warn("unable to parse the script", e);
         }
-        this.script = sb.toString();
+        setScript(sb.toString());
     }
     
     private void setPigFeature(MapReduceOper mro, Configuration conf) {
-        feature = "";
-        if (mro.isSkewedJoin()) {
-            feature = PIG_FEATURE.SKEWED_JION.toString();
-        } else if (mro.isGlobalSort()) {
-            feature = PIG_FEATURE.ORDER_BY.toString();
-        } else {
+        conf.set(PIG_PROPERTY.JOB_FEATURE.toString(), getPigFeature(mro));
+        if (scriptFeatures != 0) {
+            conf.set(PIG_PROPERTY.SCRIPT_FEATURES.toString(), 
+                    String.valueOf(scriptFeatures));
+        }
+        conf.set(PIG_PROPERTY.JOB_ALIAS.toString(), getAlias(mro));    
+    }
+    
+    private void setJobParents(MapReduceOper mro, Configuration conf) {
+        // PigStats maintains a job DAG with the job id being updated
+        // upon available. Therefore, before a job is submitted, the ids 
+        // of its parent jobs are already available. 
+        JobGraph jg = PigStats.get().getJobGraph();
+        JobStats js = null;
+        Iterator<JobStats> iter = jg.iterator();
+        while (iter.hasNext()) {
+            JobStats job = iter.next();
+            if (job.getName().equals(mro.getOperatorKey().toString())) {
+                js = job;
+                break;
+            }
+        }
+        if (js != null) {
+            try {
+                List<Operator> preds = jg.getPredecessors(js);
+                if (preds != null) {
+                    StringBuilder sb = new StringBuilder();
+                    for (Operator op : preds) {
+                        JobStats job = (JobStats)op;
+                        if (sb.length() > 0) sb.append(",");
+                        sb.append(job.getJobId());
+                    }
+                    conf.set(PIG_PROPERTY.JOB_PARENTS.toString(), sb.toString());
+                }
+            } catch (IOException e) {
+                LOG.warn("unable to get job predecessors for job " 
+                        + js.getJobId(), e);
+            }
+        }
+    }
+    
+    String getScriptFeatures() {
+        return featureLongToString(scriptFeatures);
+    }
+    
+    String getAlias(MapReduceOper mro) {
+        if (aliasMap == null) {
+            aliasMap = new HashMap<MapReduceOper, String>();
+        }
+        String retStr = aliasMap.get(mro);
+        if (retStr == null) {
+            ArrayList<String> alias = new ArrayList<String>();
             try {
-                new FeatureVisitor(mro.mapPlan).visit();
+                new AliasVisitor(mro.mapPlan, alias).visit();
+                new AliasVisitor(mro.reducePlan, alias).visit();
+                if (!alias.isEmpty()) {
+                    Collections.sort(alias);
+                }              
+            } catch (VisitorException e) {
+                LOG.warn("unable to get alias", e);
+            }
+            retStr = LoadFunc.join(alias, ",");
+            aliasMap.put(mro, retStr);
+        }
+        return retStr;
+    }
+
+    String getPigFeature(MapReduceOper mro) {
+        if (featureMap == null) {
+            featureMap = new HashMap<MapReduceOper, String>();
+        }
+        
+        String retStr = featureMap.get(mro);
+        if (retStr == null) {       
+            BitSet feature = new BitSet();
+            feature.clear();
+            if (mro.isSkewedJoin()) {
+                feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal());
+            } 
+            if (mro.isGlobalSort()) {
+                feature.set(PIG_FEATURE.ORDER_BY.ordinal());
+            } 
+            if (mro.isSampling()) { 
+                feature.set(PIG_FEATURE.SAMPLING.ordinal());
+            } 
+            if (mro.isCogroup()) {
+                feature.set(PIG_FEATURE.COGROUP.ordinal());
+            } 
+            if (mro.isGroupBy()) {
+                feature.set(PIG_FEATURE.GROUP_BY.ordinal());
+            } 
+            if (mro.isRegularJoin()) {
+                feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+            }  
+            if (mro.needsDistinctCombiner()) {
+                feature.set(PIG_FEATURE.DISTINCT.ordinal());
+            }
+            if (!mro.combinePlan.isEmpty()) {
+                feature.set(PIG_FEATURE.COMBINER.ordinal());
+            }
+            try {
+                new FeatureVisitor(mro.mapPlan, feature).visit();
                 if (mro.reducePlan.isEmpty()) { 
-                    feature = feature.isEmpty() ? 
-                            PIG_FEATURE.MAP_ONLY.toString() : feature;                    
+                    feature.set(PIG_FEATURE.MAP_ONLY.ordinal());                    
                 } else {
-                    new FeatureVisitor(mro.reducePlan).visit();
+                    new FeatureVisitor(mro.reducePlan, feature).visit();
                 }
             } catch (VisitorException e) {
                 LOG.warn("Feature visitor failed", e);
             }
+            
+            StringBuilder sb = new StringBuilder();
+            for (int i=feature.nextSetBit(0); i>=0; i=feature.nextSetBit(i+1)) {
+                if (sb.length() > 0) sb.append(",");             
+                sb.append(PIG_FEATURE.values()[i].name());
+            }
+            retStr = sb.toString();
+            featureMap.put(mro, retStr);
+        }
+        return retStr;
+    }    
+    
+    private long bitSetToLong(BitSet bs) {
+        long ret = 0;
+        for (int i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i+1)) {
+            ret |= (1L << i);
+        }
+        return ret;
+    }
+    
+    private String featureLongToString(long l) {
+        if (l == 0) return PIG_FEATURE.UNKNOWN.name();
+        
+        StringBuilder sb = new StringBuilder();
+        for (int i=0; i<PIG_FEATURE.values().length; i++) {
+            if (((l >> i) & 0x00000001) != 0) {
+                if (sb.length() > 0) sb.append(",");
+                sb.append(PIG_FEATURE.values()[i].name());
+            }
         }
-        conf.set(PIG_PROPERTY.FEATURE.toString(), feature);
+        return sb.toString();
     }
     
-    private class FeatureVisitor extends PhyPlanVisitor {
+    private static class FeatureVisitor extends PhyPlanVisitor {
+        private BitSet feature;
         
-        public FeatureVisitor(PhysicalPlan plan) {
+        public FeatureVisitor(PhysicalPlan plan, BitSet feature) {
             super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
                     plan));
+            this.feature = feature;
         }
         
         @Override
         public void visitFRJoin(POFRJoin join) throws VisitorException {
-            feature = PIG_FEATURE.REPLICATED_JOIN.toString();
+            feature.set(PIG_FEATURE.REPLICATED_JOIN.ordinal());
         }
         
         @Override
         public void visitMergeJoin(POMergeJoin join) throws VisitorException {
-            feature = PIG_FEATURE.MERGE_JION.toString();
+            feature.set(PIG_FEATURE.MERGE_JION.ordinal());
         }
         
         @Override
         public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
                 throws VisitorException {
-            feature = PIG_FEATURE.MERGE_COGROUP.toString();
+            feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal());;
         }
         
         @Override
         public void visitCollectedGroup(POCollectedGroup mg)
                 throws VisitorException {           
-            feature = PIG_FEATURE.COLLECTED_GROUP.toString();
+            feature.set(PIG_FEATURE.COLLECTED_GROUP.ordinal());
         }
         
         @Override
         public void visitDistinct(PODistinct distinct) throws VisitorException {
-            feature = PIG_FEATURE.DISTINCT.toString();
+            feature.set(PIG_FEATURE.DISTINCT.ordinal());
         }
         
         @Override
         public void visitStream(POStream stream) throws VisitorException {
-            feature = PIG_FEATURE.STREAMING.toString();
+            feature.set(PIG_FEATURE.STREAMING.ordinal());
         }
+        
+        @Override
+        public void visitSplit(POSplit split) throws VisitorException {
+            feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());
+        }
+        
+        @Override
+        public void visitDemux(PODemux demux) throws VisitorException {
+            feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());         
+        }        
     }    
+    
+    public static class LogicalPlanFeatureVisitor extends LOVisitor {
+        
+        private BitSet feature;
+        
+        protected LogicalPlanFeatureVisitor(LogicalPlan plan, BitSet feature) {
+            super(plan, new DepthFirstWalker<LogicalOperator, 
+                    LogicalPlan>(plan));            
+            this.feature = feature;
+        }
+        
+        @Override
+        protected void visit(LOCogroup op) throws VisitorException {
+            if (op.getGroupType() == GROUPTYPE.COLLECTED) {
+                feature.set(PIG_FEATURE.COLLECTED_GROUP.ordinal());
+            } else if (op.getGroupType() == GROUPTYPE.MERGE) {
+                feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal());                
+            } else if (op.getGroupType() == GROUPTYPE.REGULAR) {
+                if (op.getInputs().size() > 1) {
+                    feature.set(PIG_FEATURE.COGROUP.ordinal());
+                } else {
+                    feature.set(PIG_FEATURE.GROUP_BY.ordinal());
+                }
+            }
+        }
+        
+        @Override
+        protected void visit(LOCross op) throws VisitorException {
+            feature.set(PIG_FEATURE.CROSS.ordinal());
+        }
+        
+        @Override
+        protected void visit(LODistinct op) throws VisitorException {
+            feature.set(PIG_FEATURE.DISTINCT.ordinal());
+        }
+        
+        @Override
+        protected void visit(LOFilter op) throws VisitorException {
+            feature.set(PIG_FEATURE.FILTER.ordinal());
+        }
+        
+        @Override
+        protected void visit(LOForEach op) throws VisitorException {
+            
+        }
+                
+        @Override
+        protected void visit(LOJoin op) throws VisitorException {
+            if (op.getJoinType() == JOINTYPE.HASH) {
+                feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+            } else if (op.getJoinType() == JOINTYPE.MERGE) {
+                feature.set(PIG_FEATURE.MERGE_JION.ordinal());
+            } else if (op.getJoinType() == JOINTYPE.REPLICATED) {
+                feature.set(PIG_FEATURE.REPLICATED_JOIN.ordinal());
+            } else if (op.getJoinType() == JOINTYPE.SKEWED) {
+                feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal());
+            }
+        }
+        
+        @Override
+        protected void visit(LOLimit op) throws VisitorException {
+            feature.set(PIG_FEATURE.LIMIT.ordinal());
+        }
+        
+        @Override
+        protected void visit(LOSort op) throws VisitorException {
+            feature.set(PIG_FEATURE.ORDER_BY.ordinal());
+        }
+        
+        @Override
+        protected void visit(LOStream op) throws VisitorException {
+            feature.set(PIG_FEATURE.STREAMING.ordinal());
+        }
+        
+        @Override
+        protected void visit(LOSplit op) throws VisitorException {
+            
+        }
+        
+        @Override
+        protected void visit(LOUnion op) throws VisitorException {
+            feature.set(PIG_FEATURE.UNION.ordinal());
+        }
+    }
+    
+    private static class AliasVisitor extends PhyPlanVisitor {
+        
+        private HashSet<String> aliasSet;
+        
+        private List<String> alias;
+        
+        public AliasVisitor(PhysicalPlan plan, List<String> alias) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                    plan));
+            this.alias = alias;
+            aliasSet = new HashSet<String>();
+            if (!alias.isEmpty()) {
+                for (String s : alias) aliasSet.add(s);
+            }
+        }
+        
+        @Override
+        public void visitFRJoin(POFRJoin join) throws VisitorException {
+            setAlias(join);
+        }
+        
+        @Override
+        public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+            setAlias(join);
+        }
+        
+        @Override
+        public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
+                throws VisitorException {
+            setAlias(mergeCoGrp);
+        }
+        
+        @Override
+        public void visitCollectedGroup(POCollectedGroup mg)
+                throws VisitorException {           
+            setAlias(mg);
+        }
+        
+        @Override
+        public void visitDistinct(PODistinct distinct) throws VisitorException {
+            setAlias(distinct);
+        }
+        
+        @Override
+        public void visitStream(POStream stream) throws VisitorException {
+            setAlias(stream);
+        }
+        
+        @Override
+        public void visitFilter(POFilter fl) throws VisitorException {
+            setAlias(fl);
+        }
+         
+        @Override
+        public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException {
+            setAlias(lr);
+        }
+        
+        @Override
+        public void visitPOForEach(POForEach nfe) throws VisitorException {
+            setAlias(nfe);
+        }
+        
+        @Override
+        public void visitUnion(POUnion un) throws VisitorException {
+            setAlias(un);
+        }
+
+        @Override
+        public void visitSort(POSort sort) throws VisitorException {
+            setAlias(sort);
+        }
+ 
+        @Override
+        public void visitLimit(POLimit lim) throws VisitorException {
+            setAlias(lim);
+        }
+        
+        @Override
+        public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
+            setAlias(sk);
+        }
+        
+        private void setAlias(PhysicalOperator op) {
+            String s = op.getAlias();
+            if (s != null) {
+                if (!aliasSet.contains(s)) {
+                    alias.add(s);
+                    aliasSet.add(s);
+                }
+            }
+        }
+    }
+    
 }



Mime
View raw message