pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rd...@apache.org
Subject svn commit: r962715 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/tools/pigstats/ test/org/apache/pig/test/
Date Fri, 09 Jul 2010 22:45:59 GMT
Author: rding
Date: Fri Jul  9 22:45:58 2010
New Revision: 962715

URL: http://svn.apache.org/viewvc?rev=962715&view=rev
Log:
PIG-1389: Implement Pig counter to track number of rows for each input files

Added:
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=962715&r1=962714&r2=962715&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Jul  9 22:45:58 2010
@@ -24,6 +24,9 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1389: Implement Pig counter to track number of rows for each input files
+(rding)
+
 PIG-1454: Consider clean up backend code (rding)
 
 PIG-1333: API interface to Pig (rding)

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=962715&r1=962714&r2=962715&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Fri Jul  9 22:45:58 2010
@@ -1000,12 +1000,12 @@ public class MRCompiler extends PhyPlanV
             nonBlocking(op);
             phyToMROpMap.put(op, curMROp);
             if (op.getPackageType() == PackageType.JOIN) {
-                curMROp.setRegularJoin(true);
+                curMROp.markRegularJoin();
             } else if (op.getPackageType() == PackageType.GROUP) {
                 if (op.getNumInps() == 1) {
-                    curMROp.setGroupBy(true);
+                    curMROp.markGroupBy();
                 } else if (op.getNumInps() > 1) {
-                    curMROp.setCogroup(true);
+                    curMROp.markCogroup();
                 }
             }
         }catch(Exception e){
@@ -1493,6 +1493,7 @@ public class MRCompiler extends PhyPlanV
                 throw new PlanException(msg, errCode, PigException.BUG);
             }
             if(rightMROpr != null) {
+                rightMROpr.markIndexer();
                 // We want to ensure indexing job runs prior to actual join job. So, connect
them in order.
                 MRPlan.connect(rightMROpr, curMROp);
             }
@@ -2311,7 +2312,7 @@ public class MRCompiler extends PhyPlanV
         
         mro.setReduceDone(true);
         mro.requestedParallelism = 1;
-        mro.setSampling(true);
+        mro.markSampler();
         return new Pair<MapReduceOper, Integer>(mro, parallelismForSort);
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=962715&r1=962714&r2=962715&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
Fri Jul  9 22:45:58 2010
@@ -141,7 +141,9 @@ public class MapReduceOper extends Opera
 	private static enum OPER_FEATURE {
 	    NONE,
 	    // Indicate if this job is a sampling job
-	    SAMPLER,	    
+	    SAMPLER,
+	    // Indicate if this job is a merge indexer
+	    INDEXER,
 	    // Indicate if this job is a group by job
 	    GROUPBY,	    
 	    // Indicate if this job is a cogroup job
@@ -313,11 +315,19 @@ public class MapReduceOper extends Opera
         this.limitOnly = limitOnly;
     }
 
-    public boolean isSampling() {
+    public boolean isIndexer() {
+        return (feature == OPER_FEATURE.INDEXER);
+    }
+    
+    public void markIndexer() {
+        feature = OPER_FEATURE.INDEXER;
+    }
+    
+    public boolean isSampler() {
         return (feature == OPER_FEATURE.SAMPLER);
     }
     
-    public void setSampling(boolean sampling) {
+    public void markSampler() {
         feature = OPER_FEATURE.SAMPLER;
     }
     
@@ -325,7 +335,7 @@ public class MapReduceOper extends Opera
         return (feature == OPER_FEATURE.GROUPBY);
     }
     
-    public void setGroupBy(boolean groupBy) {
+    public void markGroupBy() {
         feature = OPER_FEATURE.GROUPBY;
     }
     
@@ -333,7 +343,7 @@ public class MapReduceOper extends Opera
         return (feature == OPER_FEATURE.COGROUP);
     }
     
-    public void setCogroup(boolean cogroup) {
+    public void markCogroup() {
         feature = OPER_FEATURE.COGROUP;
     }
     
@@ -341,7 +351,7 @@ public class MapReduceOper extends Opera
         return (feature == OPER_FEATURE.HASHJOIN);
     }
     
-    public void setRegularJoin(boolean hashJoin) {
+    public void markRegularJoin() {
         feature = OPER_FEATURE.HASHJOIN;
     }
     

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=962715&r1=962714&r2=962715&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
Fri Jul  9 22:45:58 2010
@@ -114,9 +114,8 @@ public class MapReducePOStoreImpl extend
     }
     
     public Counter createRecordCounter(POStore store) {
-        Counter outputRecordCounter = reporter.getCounter(
-                PigStatsUtil.MULTI_STORE_COUNTER_GROUP, PigStatsUtil
-                        .getMultiStoreCounterName(store));
-        return outputRecordCounter; 
+        String name = PigStatsUtil.getMultiStoreCounterName(store);
+        return (name == null) ? null : reporter.getCounter(
+                PigStatsUtil.MULTI_STORE_COUNTER_GROUP, name); 
     }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=962715&r1=962714&r2=962715&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
Fri Jul  9 22:45:58 2010
@@ -274,8 +274,12 @@ public class PigInputFormat extends Inpu
         // passing total # of splits to each split so that it can be retrieved 
         // in the RecordReader method when called by mapreduce framework later. 
         int n = splits.size();
+        // also passing the multi-input flag to the back-end so that 
+        // the multi-input record counters can be created 
+        int m = inputs.size();        
         for (InputSplit split : splits) {
             ((PigSplit) split).setTotalSplits(n);
+            if (m > 1) ((PigSplit) split).setMultiInputs(true);
         }
         
         return splits;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=962715&r1=962714&r2=962715&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
Fri Jul  9 22:45:58 2010
@@ -179,6 +179,8 @@ public abstract class PigMapBase extends
             roots = targetOpsAsList.toArray(new PhysicalOperator[1]);
             leaf = mp.getLeaves().get(0);               
         }
+        
+        PigStatusReporter.setContext(context);
  
     }
     
@@ -211,9 +213,9 @@ public abstract class PigMapBase extends
             boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
 
             PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
-            pigHadoopLogger.setAggregate(aggregateWarning);
-            PigStatusReporter.setContext(context);
+            pigHadoopLogger.setAggregate(aggregateWarning);           
             pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
+
             PhysicalOperator.setPigLogger(pigHadoopLogger);
         }
         

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=962715&r1=962714&r2=962715&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
Fri Jul  9 22:45:58 2010
@@ -18,18 +18,24 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.IOException;
+import java.util.ArrayList;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 /**
  * A wrapper around the actual RecordReader and loadfunc - this is needed for
@@ -45,6 +51,7 @@ import org.apache.pig.impl.PigContext;
  */
 public class PigRecordReader extends RecordReader<Text, Tuple> {
 
+    private static final Log LOG = LogFactory.getLog(PigRecordReader.class);
     /**
      * the current Tuple value as returned by underlying
      * {@link LoadFunc#getNext()}
@@ -58,6 +65,12 @@ public class PigRecordReader extends Rec
     // the loader object
     private LoadFunc loadfunc;
     
+    // the Hadoop counter for multi-input jobs 
+    transient private Counter inputRecordCounter = null;
+    
+    // the Hadoop counter name
+    transient private String counterName = null;
+    
     /**
      * the Configuration object with data specific to the input the underlying
      * RecordReader will process (this is obtained after a 
@@ -77,17 +90,11 @@ public class PigRecordReader extends Rec
         this.inputSpecificConf = conf;
     }
     
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.RecordReader#close()
-     */
     @Override
     public void close() throws IOException {
         wrappedReader.close();        
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
-     */
     @Override
     public Text getCurrentKey() throws IOException, InterruptedException {
         // In pig we don't really use the key in the input to the map - so send
@@ -95,25 +102,32 @@ public class PigRecordReader extends Rec
         return null;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue()
-     */
     @Override
-    public Tuple getCurrentValue() throws IOException, InterruptedException {
+    public Tuple getCurrentValue() throws IOException, InterruptedException {    
+        if (inputRecordCounter == null && counterName != null) {
+            PigStatusReporter reporter = PigStatusReporter.getInstance();
+            if (reporter != null) {
+                inputRecordCounter = reporter.getCounter(
+                        PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP,
+                        counterName);
+                LOG.info("Created input record counter: " + counterName);
+            } else {
+                LOG.warn("Get null reporter for " + counterName);
+            }
+        }
+        // Increment the multi-input record counter
+        if (inputRecordCounter != null && curValue != null) {
+            inputRecordCounter.increment(1);            
+        }
+       
         return curValue;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
-     */
     @Override
     public float getProgress() throws IOException, InterruptedException {
         return wrappedReader.getProgress();
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.RecordReader#initialize(org.apache.hadoop.mapreduce.InputSplit,
org.apache.hadoop.mapreduce.TaskAttemptContext)
-     */
     @Override
     public void initialize(InputSplit split, TaskAttemptContext context)
             throws IOException, InterruptedException {
@@ -132,15 +146,25 @@ public class PigRecordReader extends Rec
         // the "adjusted" conf
         wrappedReader.initialize(pigSplit.getWrappedSplit(), context);
         loadfunc.prepareToRead(wrappedReader, pigSplit);
+                
+        if (pigSplit.isMultiInputs()) { 
+            counterName = getMultiInputsCounerName(pigSplit, inputSpecificConf);
+        }
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.RecordReader#nextKeyValue()
-     */
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
         curValue = loadfunc.getNext();
         return curValue != null;
     }
 
+    @SuppressWarnings("unchecked")
+    private static String getMultiInputsCounerName(PigSplit pigSplit,
+            Configuration conf) throws IOException {
+        ArrayList<FileSpec> inputs = 
+            (ArrayList<FileSpec>) ObjectSerializer.deserialize(
+                    conf.get(PigInputFormat.PIG_INPUTS));
+        String fname = inputs.get(pigSplit.getInputIndex()).getFileName();
+        return PigStatsUtil.getMultiInputsCounterName(fname);
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=962715&r1=962714&r2=962715&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
Fri Jul  9 22:45:58 2010
@@ -73,6 +73,11 @@ public class PigSplit extends InputSplit
     // index
     private int splitIndex;
     
+    // the flag indicates this is a multi-input join (i.e. join)
+    // so that custom Hadoop counters will be created in the 
+    // back-end to track the number of records for each input.
+    private boolean isMultiInputs = false;
+    
     /**
      * the job Configuration
      */
@@ -115,9 +120,6 @@ public class PigSplit extends InputSplit
             return wrappedSplit.getLocations();
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.InputSplit#getLength()
-     */
     @Override
     public long getLength() throws IOException, InterruptedException {
         return wrappedSplit.getLength();
@@ -125,6 +127,7 @@ public class PigSplit extends InputSplit
     
     @SuppressWarnings("unchecked")
     public void readFields(DataInput is) throws IOException {
+        isMultiInputs = is.readBoolean();
         totalSplits = is.readInt();
         splitIndex = is.readInt();
         inputIndex = is.readInt();
@@ -146,6 +149,7 @@ public class PigSplit extends InputSplit
 
     @SuppressWarnings("unchecked")
     public void write(DataOutput os) throws IOException {
+        os.writeBoolean(isMultiInputs);
         os.writeInt(totalSplits);
         os.writeInt(splitIndex);
         os.writeInt(inputIndex);
@@ -191,10 +195,23 @@ public class PigSplit extends InputSplit
         return splitIndex;
     }
 
-
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.conf.Configurable#getConf()
+    /**
+     * Indicates this map has multiple input (such as the result of
+     * a join operation).
+     * @param b true if the map has multiple inputs
+     */
+    public void setMultiInputs(boolean b) {
+        isMultiInputs = b;
+    }
+    
+    /**
+     * Returns true if the map has multiple inputs, else false
+     * @return true if the map has multiple inputs, else false
      */
+    public boolean isMultiInputs() {
+        return isMultiInputs;
+    }
+    
     @Override
     public Configuration getConf() {
         return conf;

Added: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java?rev=962715&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/InputStats.java Fri Jul  9 22:45:58
2010
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * This class encapsulates the runtime statistics of a user specified input.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class InputStats {
+
+    private String name;
+    private String location;
+    private long bytes;
+    private long records;
+
+    private boolean success;
+    
+    public static enum INPUT_TYPE { regular, sampler, indexer, side };
+ 
+    private INPUT_TYPE type = INPUT_TYPE.regular;
+    
+    private Configuration conf;
+
+    InputStats(String location, long bytes, long records, boolean success) {
+        this.location = location;
+        this.bytes = bytes;
+        this.records = records;        
+        this.success = success;
+        try {
+            this.name = new Path(location).getName();
+        } catch (Exception e) {
+            // location is a mal formatted URL 
+            this.name = location;
+        }
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getLocation() {
+        return location;
+    }
+
+    public long getBytes() {
+        return bytes;
+    }
+
+    public long getNumberRecords() {
+        return records;
+    }
+
+    public boolean isSuccessful() {
+        return success;
+    }
+
+    public Configuration getConf() {
+        return conf;
+    }
+    
+    public INPUT_TYPE getInputType() {
+        return type;
+    }
+    
+    String getDisplayString() {
+        StringBuilder sb = new StringBuilder();
+        if (success) {
+            sb.append("Successfully ");
+            if (type == INPUT_TYPE.sampler) {
+                sb.append("sampled ");
+            } else if (type == INPUT_TYPE.indexer) {
+                sb.append("indexed ");
+            } else {
+                sb.append("read ");
+            }
+            sb.append(records).append(" records ");
+            if (bytes > 0) {
+                sb.append("(").append(bytes).append(" bytes) ");
+            }
+            sb.append("from: \"").append(location).append("\"");
+            if (type == INPUT_TYPE.side) {
+                sb.append(" as side file");
+            }
+            sb.append("\n");
+        } else {
+            sb.append("Failed to read data from \"").append(location)
+                    .append("\"\n");
+        }
+        return sb.toString();
+    }
+    
+    void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+    
+    void markSampleInput() {
+        type = INPUT_TYPE.sampler;
+    }
+    
+    void markIndexerInput() {
+        type = INPUT_TYPE.indexer;
+    }
+    
+    void markSideFileInput() {
+        type = INPUT_TYPE.side;
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=962715&r1=962714&r2=962715&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Fri Jul  9 22:45:58 2010
@@ -20,6 +20,7 @@ package org.apache.pig.tools.pigstats;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -47,6 +48,7 @@ import org.apache.pig.classification.Int
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
@@ -55,7 +57,6 @@ import org.apache.pig.tools.pigstats.Pig
  * This class encapsulates the runtime statistics of a MapReduce job. 
  * Job statistics is collected when job is completed.
  */
-
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public final class JobStats extends Operator {
@@ -75,7 +76,11 @@ public final class JobStats extends Oper
     
     private List<POStore> reduceStores = null;
     
+    private List<FileSpec> loads = null;
+    
     private ArrayList<OutputStats> outputs;
+    
+    private ArrayList<InputStats> inputs;
        
     private String errorMsg;
     
@@ -99,18 +104,23 @@ public final class JobStats extends Oper
     private long reduceInputRecords = 0;
     private long reduceOutputRecords = 0;
     private long hdfsBytesWritten = 0;
+    private long hdfsBytesRead = 0;
     private long spillCount = 0;
     private long activeSpillCount = 0;
     
     private HashMap<String, Long> multiStoreCounters 
             = new HashMap<String, Long>();
     
+    private HashMap<String, Long> multiInputCounters 
+            = new HashMap<String, Long>();
+        
     @SuppressWarnings("deprecation")
     private Counters counters = null;
-        
+    
     JobStats(String name, JobGraph plan) {
         super(name, plan);
         outputs = new ArrayList<OutputStats>();
+        inputs = new ArrayList<InputStats>();
     }
 
     public String getJobId() { 
@@ -161,6 +171,10 @@ public final class JobStats extends Oper
     public List<OutputStats> getOutputs() {
         return Collections.unmodifiableList(outputs);
     }
+    
+    public List<InputStats> getInputs() {
+        return Collections.unmodifiableList(inputs);
+    }
 
     public Map<String, Long> getMultiStoreCounters() {
         return Collections.unmodifiableMap(multiStoreCounters);
@@ -214,6 +228,7 @@ public final class JobStats extends Oper
         return name.equalsIgnoreCase(operator.getName());
     }    
  
+
     @SuppressWarnings("deprecation")
     void setId(JobID jobId) {
         this.jobId = jobId;
@@ -239,7 +254,9 @@ public final class JobStats extends Oper
             mapStores = (List<POStore>) ObjectSerializer.deserialize(conf
                     .get(JobControlCompiler.PIG_MAP_STORES));
             reduceStores = (List<POStore>) ObjectSerializer.deserialize(conf
-                    .get(JobControlCompiler.PIG_REDUCE_STORES));
+                    .get(JobControlCompiler.PIG_REDUCE_STORES));           
+            loads = (ArrayList<FileSpec>) ObjectSerializer.deserialize(conf
+                    .get("pig.inputs"));
         } catch (IOException e) {
             LOG.warn("Failed to deserialize the store list", e);
         }                    
@@ -288,7 +305,8 @@ public final class JobStats extends Oper
     }
 
     @SuppressWarnings("deprecation")
-    void addCounters(RunningJob rjob) {        
+    void addCounters(RunningJob rjob) {
+        Counters counters = null;
         if (rjob != null) {
             try {
                 counters = rjob.getCounters();
@@ -303,7 +321,9 @@ public final class JobStats extends Oper
                     .getGroup(PigStatsUtil.FS_COUNTER_GROUP);
             Counters.Group multistoregroup = counters
                     .getGroup(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-            
+            Counters.Group multiloadgroup = counters
+                    .getGroup(PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP);
+
             mapInputRecords = taskgroup.getCounterForName(
                     PigStatsUtil.MAP_INPUT_RECORDS).getCounter();
             mapOutputRecords = taskgroup.getCounterForName(
@@ -312,6 +332,8 @@ public final class JobStats extends Oper
                     PigStatsUtil.REDUCE_INPUT_RECORDS).getCounter();
             reduceOutputRecords = taskgroup.getCounterForName(
                     PigStatsUtil.REDUCE_OUTPUT_RECORDS).getCounter();
+            hdfsBytesRead = hdfsgroup.getCounterForName(
+                    PigStatsUtil.HDFS_BYTES_READ).getCounter();      
             hdfsBytesWritten = hdfsgroup.getCounterForName(
                     PigStatsUtil.HDFS_BYTES_WRITTEN).getCounter();            
             spillCount = counters.findCounter(
@@ -324,7 +346,14 @@ public final class JobStats extends Oper
             while (iter.hasNext()) {
                 Counter cter = iter.next();
                 multiStoreCounters.put(cter.getName(), cter.getValue());
-            }            
+            }     
+            
+            Iterator<Counter> iter2 = multiloadgroup.iterator();
+            while (iter2.hasNext()) {
+                Counter cter = iter2.next();
+                multiInputCounters.put(cter.getName(), cter.getValue());
+            } 
+            
         }              
     }
     
@@ -414,12 +443,19 @@ public final class JobStats extends Oper
         } else {
             records = mapOutputRecords;
         }
-        String location = sto.getSFile().getFileName();
-        Path p = new Path(location);
-        URI uri = p.toUri();
+        String location = sto.getSFile().getFileName();        
+        URI uri = null;
+        try {
+            uri = new URI(location);
+        } catch (URISyntaxException e1) {
+            LOG.warn("invalid syntax for output location: " + location, e1);
+        }
         long bytes = -1;
-        if (uri.getScheme() == null || uri.getScheme().equalsIgnoreCase("hdfs")) {
+        if (uri != null
+                && (uri.getScheme() == null || uri.getScheme()
+                        .equalsIgnoreCase("hdfs"))) {
             try {
+                Path p = new Path(location);
                 FileSystem fs = p.getFileSystem(conf);
                 FileStatus[] lst = fs.listStatus(p);
                 if (lst != null) {
@@ -438,4 +474,64 @@ public final class JobStats extends Oper
         outputs.add(ds);
     }
        
+    void addInputStatistics() {
+        if (loads == null)  {
+            LOG.warn("unable to get inputs of the job");
+            return;
+        }
+        
+        if (loads.size() == 1) {
+            FileSpec fsp = loads.get(0); 
+            if (!PigStatsUtil.isTempFile(fsp.getFileName())) {
+                long records = mapInputRecords;       
+                InputStats is = new InputStats(fsp.getFileName(),
+                        hdfsBytesRead, records, (state == JobState.SUCCESS));           
  
+                is.setConf(conf);
+                if (isSampler()) is.markSampleInput();
+                if (isIndexer()) is.markIndexerInput();
+                inputs.add(is);                
+            }
+        } else {
+            // check for self-join (duplicated input file names)
+            HashMap<String, Integer> dupmap = new HashMap<String, Integer>();
+            for (FileSpec fsp : loads) {
+                String name = PigStatsUtil.getMultiInputsCounterName(fsp.getFileName());
+                if (name == null) continue;
+                if (dupmap.containsKey(name)) {
+                    int n = dupmap.get(name);
+                    dupmap.put(name, (n+1));
+                } else {
+                    dupmap.put(name, 1);
+                }                
+            }
+            for (FileSpec fsp : loads) {
+                if (PigStatsUtil.isTempFile(fsp.getFileName())) continue;
+                addOneInputStats(fsp.getFileName(), dupmap);
+            }
+        }            
+    }
+    
+    private void addOneInputStats(String fileName, Map<String, Integer> dupmap) {
+        long records = -1;
+        Long n = multiInputCounters.get(
+                PigStatsUtil.getMultiInputsCounterName(fileName));
+        if (n != null) {
+            Integer m = dupmap.get(PigStatsUtil.getMultiInputsCounterName(fileName));   
        
+            records = (m != null && m > 0) ? (n / m) : n;
+        } else {
+            LOG.warn("unable to get input counter for " + fileName);
+        }
+        InputStats is = new InputStats(fileName, -1, records, (state == JobState.SUCCESS));
             
+        is.setConf(conf);
+        inputs.add(is);
+    }
+    
+    private boolean isSampler() {
+        return getFeature().contains(ScriptState.PIG_FEATURE.SAMPLER.name());
+    }
+    
+    private boolean isIndexer() {
+        return getFeature().contains(ScriptState.PIG_FEATURE.INDEXER.name());
+    }
+    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=962715&r1=962714&r2=962715&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Fri Jul  9 22:45:58 2010
@@ -438,6 +438,17 @@ public final class PigStats {
         return Collections.unmodifiableList(outputs);       
     }
     
+    public List<InputStats> getInputStats() {
+        List<InputStats> inputs = new ArrayList<InputStats>();
+        Iterator<JobStats> iter = jobPlan.iterator();
+        while (iter.hasNext()) {
+            for (InputStats is : iter.next().getInputs()) {
+                inputs.add(is);
+            }
+        }        
+        return Collections.unmodifiableList(inputs);       
+    }
+    
     private PigStats() {        
         jobMroMap = new HashMap<String, MapReduceOper>(); 
         jobPlan = new JobGraph();
@@ -565,7 +576,12 @@ public final class PigStats {
             }
             sb.append("\n");
         }
-        sb.append("Outputs:\n");
+        sb.append("Input(s):\n");
+        for (InputStats is : getInputStats()) {
+            sb.append(is.getDisplayString());
+        }
+        sb.append("\n");
+        sb.append("Output(s):\n");
         for (OutputStats ds : getOutputStats()) {
             sb.append(ds.getDisplayString());
         }

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=962715&r1=962714&r2=962715&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Fri Jul  9 22:45:58
2010
@@ -19,15 +19,17 @@
 package org.apache.pig.tools.pigstats;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
@@ -61,6 +63,12 @@ public abstract class PigStatsUtil {
             = "REDUCE_OUTPUT_RECORDS";
     public static final String HDFS_BYTES_WRITTEN 
             = "HDFS_BYTES_WRITTEN";
+    public static final String HDFS_BYTES_READ 
+            = "HDFS_BYTES_READ";
+    public static final String MULTI_INPUTS_RECORD_COUNTER 
+            = "Input records from ";
+    public static final String MULTI_INPUTS_COUNTER_GROUP 
+            = "MultiInputCounters";
     
     private static final Log LOG = LogFactory.getLog(PigStatsUtil.class);
    
@@ -97,8 +105,53 @@ public abstract class PigStatsUtil {
      * @return the counter name 
      */
     public static String getMultiStoreCounterName(POStore store) {
-        return MULTI_STORE_RECORD_COUNTER +
-                new Path(store.getSFile().getFileName()).getName();
+        String shortName = null;
+        try {
+            shortName = getShortName(new URI(store.getSFile().getFileName()));
+        } catch (URISyntaxException e) {
+            LOG.warn("Invalid syntax for output location", e);
+        }
+        return (shortName == null) ? null 
+                : MULTI_STORE_RECORD_COUNTER + shortName;
+    }
+    
+    /**
+     * Returns the counter name for the given input file name
+     * 
+     * @param fname the input file name
+     * @return the counter name
+     */
+    public static String getMultiInputsCounterName(String fname) {
+        String shortName = null;
+        try {
+            shortName = getShortName(new URI(fname));            
+        } catch (URISyntaxException e) {
+            LOG.warn("Invalid syntax for input location", e);
+        }
+        return (shortName == null) ? null 
+                : MULTI_INPUTS_RECORD_COUNTER + shortName;
+    }
+    
+    private static final String SEPARATOR = "/";
+    private static final String SEMICOLON = ";";
+    
+    private static String getShortName(URI uri) {  
+        String path = uri.getPath();
+        if (path != null) {
+            int slash = path.lastIndexOf(SEPARATOR);
+            return path.substring(slash+1);
+        } 
+        // for cases such as
+        // "jdbc:hsqldb:file:/tmp/batchtest;hsqldb.default_table_type=cached;hsqldb.cache_rows=100"
+        path = uri.getSchemeSpecificPart();
+        if (path != null) {
+            int slash = path.lastIndexOf(SEPARATOR);
+            int scolon = path.indexOf(SEMICOLON);
+            if (slash < scolon) {
+                return path.substring(slash+1, scolon);
+            }
+        }
+        return null;       
     }
            
     /**
@@ -213,6 +266,13 @@ public abstract class PigStatsUtil {
         PigStats.get().setBackendException(job, e);
     }
     
+    private static Pattern pattern = Pattern.compile("tmp(-)?[\\d]{1,10}$");
+    
+    public static boolean isTempFile(String fileName) {
+        Matcher result = pattern.matcher(fileName);
+        return result.find();
+    }
+    
     private static JobStats addFailedJobStats(PigStats ps, Job job) {
         JobStats js = ps.addJobStats(job);
         if (js == null) {
@@ -220,6 +280,7 @@ public abstract class PigStatsUtil {
         } else {       
             js.setSuccessful(false);
             js.addOutputStatistics();
+            js.addInputStatistics();
         }
         return js;
     }
@@ -248,6 +309,8 @@ public abstract class PigStatsUtil {
             }
             
             js.addOutputStatistics();
+            
+            js.addInputStatistics();
         }
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=962715&r1=962714&r2=962715&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Fri Jul  9 22:45:58
2010
@@ -138,7 +138,8 @@ public class ScriptState {
         ORDER_BY,
         DISTINCT,
         STREAMING,
-        SAMPLING,
+        SAMPLER,
+        INDEXER,
         MULTI_QUERY,
         FILTER,
         MAP_ONLY,
@@ -367,7 +368,7 @@ public class ScriptState {
                     conf.set(PIG_PROPERTY.JOB_PARENTS.toString(), sb.toString());
                 }
             } catch (IOException e) {
-                LOG.warn("unable to get job predecessors for job " 
+                LOG.warn("unable to get job predecessors for job "
                         + js.getJobId(), e);
             }
         }
@@ -414,9 +415,12 @@ public class ScriptState {
             if (mro.isGlobalSort()) {
                 feature.set(PIG_FEATURE.ORDER_BY.ordinal());
             } 
-            if (mro.isSampling()) { 
-                feature.set(PIG_FEATURE.SAMPLING.ordinal());
+            if (mro.isSampler()) { 
+                feature.set(PIG_FEATURE.SAMPLER.ordinal());
             } 
+            if (mro.isIndexer()) { 
+                feature.set(PIG_FEATURE.INDEXER.ordinal());
+            }
             if (mro.isCogroup()) {
                 feature.set(PIG_FEATURE.COGROUP.ordinal());
             } 

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=962715&r1=962714&r2=962715&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java Fri Jul  9 22:45:58 2010
@@ -35,6 +35,7 @@ import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
@@ -592,4 +593,90 @@ public class TestCounters {
 //
 //    }
 
+    @Test
+    public void testJoinInputCounters() throws Exception {        
+        testInputCounters("join");
+    }
+    
+    @Test
+    public void testCogroupInputCounters() throws Exception {        
+        testInputCounters("cogroup");
+    }
+    
+    @Test
+    public void testSkewedInputCounters() throws Exception {        
+        testInputCounters("skewed");
+    }
+    
+    @Test
+    public void testSelfJoinInputCounters() throws Exception {        
+        testInputCounters("self-join");
+    }
+    
+    private static boolean multiInputCreated = false;
+    
+    private static int count = 0;
+            
+    private void testInputCounters(String keyword) throws Exception {  
+        String file1 = "multi-input1.txt";
+        String file2 = "multi-input2.txt";
+        
+        String output = keyword;
+        
+        if (keyword.equals("self-join")) {
+            file2 = file1;
+            keyword = "join";
+        }
+         
+        final int MAX_NUM_RECORDS = 100; 
+        if (!multiInputCreated) {
+            PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file1));
+            for (int i = 0; i < MAX_NUM_RECORDS; i++) {
+                int t = r.nextInt(100);
+                pw.println(t);
+            }
+            pw.close();
+                        
+            PrintWriter pw2 = new PrintWriter(Util.createInputFile(cluster, file2));
+            for (int i = 0; i < MAX_NUM_RECORDS; i++) {
+                int t = r.nextInt(100);
+                if (t > 50) {
+                    count ++;
+                    pw2.println(t);
+                }
+            }
+            pw2.close();
+            multiInputCreated = true;
+        }
+        
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
+                cluster.getProperties());
+        pigServer.setBatchOn();
+        pigServer.registerQuery("a = load '" + file1 + "';");
+        pigServer.registerQuery("b = load '" + file2 + "';");
+        if (keyword.equals("join") || keyword.endsWith("cogroup")) {
+            pigServer.registerQuery("c = " + keyword + " a by $0, b by $0;");
+        } else if (keyword.equals("skewed")) {
+            pigServer.registerQuery("c = join a by $0, b by $0 using 'skewed';");
+        }
+        ExecJob job = pigServer.store("c", output + "_output");
+        
+        PigStats stats = job.getStatistics();
+        assertTrue(stats.isSuccessful());
+        List<InputStats> inputs = stats.getInputStats();
+        if (keyword.equals("join") || keyword.endsWith("cogroup")) {
+            assertEquals(2, inputs.size());
+        } else if (keyword.equals("skewed")) {
+            assertEquals(3, inputs.size());
+        }
+        for (InputStats input : inputs) {
+            if (file1.equals(input.getName()) && input.getInputType() == InputStats.INPUT_TYPE.regular)
{
+                assertEquals(MAX_NUM_RECORDS, input.getNumberRecords());
+            } else if (file2.equals(input.getName())){
+                assertEquals(count, input.getNumberRecords());
+            } else {
+                assertTrue(input.getInputType() == InputStats.INPUT_TYPE.sampler);
+            }
+        }
+    }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=962715&r1=962714&r2=962715&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java Fri Jul  9 22:45:58 2010
@@ -24,13 +24,19 @@ import static org.junit.Assert.assertTru
 import java.io.File;
 import java.io.FileWriter;
 import java.io.PrintWriter;
+import java.util.Properties;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.pig.ExecType;
 import org.apache.pig.PigRunner;
 import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.backend.hadoop.datastorage.HPath;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -251,8 +257,7 @@ public class TestPigRunner {
         
         try {
             String[] args = { PIG_FILE };
-            PigStats stats = PigRunner.run(args); 
-            System.out.println("++++ code: " + stats.getReturnCode());
+            PigStats stats = PigRunner.run(args);             
             assertTrue(!stats.isSuccessful());            
             assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE);
             assertTrue(stats.getJobGraph().size() == 2);
@@ -274,4 +279,21 @@ public class TestPigRunner {
             Util.deleteFile(cluster, OUTPUT_FILE_2);
         }
     }
+    
+    @Test
+    public void testIsTempFile() throws Exception {
+        PigContext context = new PigContext(ExecType.LOCAL, new Properties());
+        context.connect();
+        for (int i=0; i<100; i++) {
+            String file = FileLocalizer.getTemporaryPath(context).toString();
+            assertTrue("not a temp file: " + file, PigStatsUtil.isTempFile(file));
+        }
+    }
+    
+    @Test
+    public void testCounterName() throws Exception {
+        String s = "jdbc:hsqldb:file:/tmp/batchtest;hsqldb.default_table_type=cached;hsqldb.cache_rows=100";
+        String name = PigStatsUtil.getMultiInputsCounterName(s);
+        assertEquals(PigStatsUtil.MULTI_INPUTS_RECORD_COUNTER + "batchtest", name);
+    }
 }



Mime
View raw message