pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dvrya...@apache.org
Subject svn commit: r1370848 - in /pig/trunk: ./ conf/ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOpe...
Date Wed, 08 Aug 2012 17:27:30 GMT
Author: dvryaboy
Date: Wed Aug  8 17:27:29 2012
New Revision: 1370848

URL: http://svn.apache.org/viewvc?rev=1370848&view=rev
Log:
PIG-2855: Provide a method to measure time spent in UDFs

Added:
    pig/trunk/src/org/apache/pig/PigConfiguration.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1370848&r1=1370847&r2=1370848&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Aug  8 17:27:29 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2855: Provide a method to measure time spent in UDFs (dvryaboy)
+
 PIG-2837: AvroStorage throws StackOverFlowError (cheolsoo via sms)
 
 PIG-2856: AvroStorage doesn't load files in the directories when a glob pattern matches both
files and directories. (cheolsoo via sms)

Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1370848&r1=1370847&r2=1370848&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Wed Aug  8 17:27:29 2012
@@ -59,6 +59,11 @@
 #using more counters than hadoop configured limit
 #pig.disable.counter=true
 
+# Use this option to turn on UDF timers. This will cause two 
+# counters to be tracked for every UDF and LoadFunc in your script:
+# approx_microsecs measures approximate time spent inside a UDF
+# approx_invocations reports the approximate number of times the UDF was invoked
+# pig.udf.profile=false
 
 #When enabled, 'describe' prints a multi-line formatted schema
 #(similar to an indended json) rather than on a single line.

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml?rev=1370848&r1=1370847&r2=1370848&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml Wed Aug  8 17:27:29 2012
@@ -22,6 +22,11 @@
   </header>
   <body> 
   
+<section id="profiling">
+  <title>Timing your UDFs</title>
+  <p>The first step to improving performance and efficiency is measuring where the
time is going. Pig provides a light-weight method for approximately measuring how much time
is spent in different user-defined functions (UDFs) and Loaders. Simply set the pig.udf.profile
property to true. This will cause new counters to be tracked for all Map-Reduce jobs generated
by your script: approx_microsecs measures the approximate amount of time spent in a UDF, and
approx_invocations measures the approximate number of times the UDF was invoked. Note that
this may produce a large number of counters (two per UDF). Excessive amounts of counters can
lead to poor JobTracker performance, so use this feature carefully, and preferably on a test
cluster.</p>
+</section>
+
 <!-- ================================================================== -->
 <!-- COMBINER -->
 <section id="combiner">

Added: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1370848&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (added)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Wed Aug  8 17:27:29 2012
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig;
+
+/**
+ * Container for static configuration strings, defaults, etc.
+ */
+public class PigConfiguration {
+
+    /**
+     * Controls whether execution time of Pig UDFs should be tracked.
+     * This feature uses counters; use judiciously.
+     */
+    public static final String TIME_UDFS_PROP = "pig.udf.profile";
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1370848&r1=1370847&r2=1370848&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
Wed Aug  8 17:27:29 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
+import static org.apache.pig.PigConfiguration.TIME_UDFS_PROP;
+
 import java.io.IOException;
 import java.util.ArrayList;
 
@@ -52,6 +54,13 @@ import org.apache.pig.tools.pigstats.Pig
 public class PigRecordReader extends RecordReader<Text, Tuple> {
 
     private static final Log LOG = LogFactory.getLog(PigRecordReader.class);
+
+    private final static String TIMING_COUNTER = "approx_microsecs";
+    private final static int TIMING_FREQ = 100;
+
+    transient private String counterGroup = "";
+    private boolean doTiming = false;
+
     /**
      * the current Tuple value as returned by underlying
      * {@link LoadFunc#getNext()}
@@ -112,6 +121,8 @@ public class PigRecordReader extends Rec
         idx = 0;
         this.limit = limit;
         initNextRecordReader();
+        counterGroup = loadFunc.toString();
+        doTiming = context.getConfiguration().getBoolean(TIME_UDFS_PROP, false);
     }
     
     @Override
@@ -189,13 +200,23 @@ public class PigRecordReader extends Rec
 
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
+
         if (limit != -1 && recordCount >= limit)
             return false;
+        boolean timeThis = doTiming && ( (recordCount + 1) % TIMING_FREQ == 0);
+        long startNanos = 0;
+        if (timeThis) {
+            startNanos = System.nanoTime();
+        }
         while ((curReader == null) || (curValue = loadfunc.getNext()) == null) {
             if (!initNextRecordReader()) {
               return false;
             }
         }
+        if (timeThis) {
+            PigStatusReporter.getInstance().getCounter(counterGroup, TIMING_COUNTER).increment(
+                    ( Math.round((System.nanoTime() - startNanos) / 1000)) * TIMING_FREQ);
+        }
         recordCount++;
         return true;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1370848&r1=1370847&r2=1370848&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
Wed Aug  8 17:27:29 2012
@@ -18,6 +18,8 @@
 
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
 
+import static org.apache.pig.PigConfiguration.TIME_UDFS_PROP;
+
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.lang.reflect.Type;
@@ -27,6 +29,7 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
@@ -54,10 +57,15 @@ import org.apache.pig.impl.plan.NodeIdGe
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 public class POUserFunc extends ExpressionOperator {
     private static final Log LOG = LogFactory.getLog(POUserFunc.class);
+    private final static String TIMING_COUNTER = "approx_microsecs";
+    private final static String INVOCATION_COUNTER = "approx_invocations";
+    private final static int TIMING_FREQ = 100;
 
+    private transient String counterGroup;
     /**
      *
      */
@@ -78,6 +86,9 @@ public class POUserFunc extends Expressi
     private String signature;
     private boolean haveCheckedIfTerminatingAccumulator;
 
+    private long numInvocations = 0L;
+    private boolean doTiming = false;
+
     public PhysicalOperator getReferencedOperator() {
         return referencedOperator;
     }
@@ -119,6 +130,7 @@ public class POUserFunc extends Expressi
         this.setSignature(signature);
         Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass());
     	Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
+
     	if(tmpS!=null)
     		this.func.setInputSchema(tmpS);
         if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) {
@@ -148,7 +160,13 @@ public class POUserFunc extends Expressi
         if(!initialized) {
             func.setReporter(reporter);
             func.setPigLogger(pigLogger);
-
+            Configuration jobConf = UDFContext.getUDFContext().getJobConf();
+            if (jobConf != null) {
+                doTiming = "true".equalsIgnoreCase(jobConf.get(TIME_UDFS_PROP, "false"));
+                counterGroup = funcSpec.toString();
+            } else {
+                LOG.warn("jobConf not available. Not tracking UDF timing regardless of user
preference.");
+            }
             // We initialize here instead of instantiateFunc because this is called
             // when actual processing has begun, whereas a function can be instantiated
             // on the frontend potentially (mainly for optimization)
@@ -261,6 +279,13 @@ public class POUserFunc extends Expressi
     private Result getNext() throws ExecException {
         Result result = processInput();
         String errMsg = "";
+        long startNanos = 0;
+        boolean timeThis = doTiming && (numInvocations++ % TIMING_FREQ == 0);
+        if (timeThis) {
+            startNanos = System.nanoTime();
+            PigStatusReporter.getInstance().getCounter(counterGroup, INVOCATION_COUNTER).increment(TIMING_FREQ);
+
+        }
         try {
             if(result.returnStatus == POStatus.STATUS_OK) {
                 if (isAccumulative()) {
@@ -310,9 +335,11 @@ public class POUserFunc extends Expressi
                     result.result = func.exec((Tuple) result.result);
                     }
                 }
-                return result;
             }
-
+            if (timeThis) {
+                PigStatusReporter.getInstance().getCounter(counterGroup, TIMING_COUNTER).increment(
+                        ( Math.round((System.nanoTime() - startNanos) / 1000)) * TIMING_FREQ);
+            }
             return result;
         } catch (ExecException ee) {
             throw ee;



Mime
View raw message