pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From billgra...@apache.org
Subject svn commit: r1332517 - in /pig/trunk: ./ src/org/apache/pig/scripting/ src/org/apache/pig/tools/pigstats/ test/org/apache/pig/test/
Date Tue, 01 May 2012 04:23:41 GMT
Author: billgraham
Date: Tue May  1 04:23:40 2012
New Revision: 1332517

URL: http://svn.apache.org/viewvc?rev=1332517&view=rev
Log:
PIG-2660: PPNL notified of plan before it gets executed (billgraham)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/scripting/SyncProgressNotificationAdaptor.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
    pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
    pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
    pig/trunk/test/org/apache/pig/test/TestPigRunner.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1332517&r1=1332516&r2=1332517&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue May  1 04:23:40 2012
@@ -176,6 +176,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2660: PPNL notified of plan before it gets executed (billgraham)
+
 PIG-2574: Make reducer estimator plugable (billgraham)
 
 PIG-2601: Additional document for 0.10 (daijy)

Modified: pig/trunk/src/org/apache/pig/scripting/SyncProgressNotificationAdaptor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/SyncProgressNotificationAdaptor.java?rev=1332517&r1=1332516&r2=1332517&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/SyncProgressNotificationAdaptor.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/SyncProgressNotificationAdaptor.java Tue May  1
04:23:40 2012
@@ -20,12 +20,17 @@ package org.apache.pig.scripting;
 
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
 
 class SyncProgressNotificationAdaptor implements PigProgressNotificationListener {
 
+    private static final Log LOG = LogFactory.getLog(SyncProgressNotificationAdaptor.class);
+
     private List<PigProgressNotificationListener> listeners;
 
     public SyncProgressNotificationAdaptor(
@@ -80,6 +85,21 @@ class SyncProgressNotificationAdaptor im
     }
 
     @Override
+    public void initialPlanNotification(String scriptId, MROperPlan plan) {
+        synchronized (listeners) {
+            for (PigProgressNotificationListener listener : listeners) {
+                try {
+                    listener.initialPlanNotification(scriptId, plan);
+                } catch (NoSuchMethodError e) {
+                    LOG.warn("PigProgressNotificationListener implementation doesn't "
+                           + "implement initialPlanNotification(..) method: "
+                           + listener.getClass().getName(), e);
+                }
+            }
+        }
+    }
+
+    @Override
     public void launchStartedNotification(String scriptId, int numJobsToLaunch) {
         synchronized (listeners) {
             for (PigProgressNotificationListener listener : listeners) {

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java?rev=1332517&r1=1332516&r2=1332517&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java Tue May
 1 04:23:40 2012
@@ -18,6 +18,7 @@
 
 package org.apache.pig.tools.pigstats;
 
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 
@@ -30,8 +31,16 @@ import org.apache.pig.PigRunner;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface PigProgressNotificationListener extends java.util.EventListener {
-    
-    /** 
+
+    /**
+     * Invoked before any MR jobs are run with the plan that is to be executed.
+     *
+     * @param scriptId the unique id of the script
+     * @param plan the MROperPlan that is to be executed
+     */
+    public void initialPlanNotification(String scriptId, MROperPlan plan);
+
+    /**
      * Invoked just before launching MR jobs spawned by the script.
      * @param scriptId the unique id of the script
      * @param numJobsToLaunch the total number of MR jobs spawned by the script

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1332517&r1=1332516&r2=1332517&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Tue May  1 04:23:40 2012
@@ -164,6 +164,7 @@ public abstract class PigStatsUtil {
         SimplePigStats ps = (SimplePigStats)PigStats.start();
         ps.start(pc, client, jcc, plan);
         
+        ScriptState.get().emitInitialPlanNotification(plan);
         ScriptState.get().emitLaunchStartedNotification(plan.size());
     }
      
@@ -187,7 +188,7 @@ public abstract class PigStatsUtil {
                 ps.getNumberSuccessfulJobs());
         if (display) ps.display();
     }
-    
+
     /**
      * Returns an empty PigStats object
      * 

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1332517&r1=1332516&r2=1332517&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Tue May  1 04:23:40 2012
@@ -44,6 +44,7 @@ import org.apache.hadoop.util.VersionInf
 import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -218,6 +219,18 @@ public class ScriptState {
         return listeners;
     }
         
+    public void emitInitialPlanNotification(MROperPlan plan) {
+        for (PigProgressNotificationListener listener: listeners) {
+            try {
+                listener.initialPlanNotification(id, plan);
+            } catch (NoSuchMethodError e) {
+                LOG.warn("PigProgressNotificationListener implementation doesn't "
+                       + "implement initialPlanNotification(..) method: "
+                       + listener.getClass().getName(), e);
+            }
+        }
+    }
+
     public void emitLaunchStartedNotification(int numJobsToLaunch) {
         for (PigProgressNotificationListener listener: listeners) {
             listener.launchStartedNotification(id, numJobsToLaunch);

Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1332517&r1=1332516&r2=1332517&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Tue May  1 04:23:40 2012
@@ -19,6 +19,7 @@ package org.apache.pig.test;
 
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -40,6 +41,7 @@ import org.apache.pig.ExecType;
 import org.apache.pig.PigRunner;
 import org.apache.pig.PigRunner.ReturnCode;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.newplan.Operator;
@@ -869,7 +871,13 @@ public class TestPigRunner {
         private static final int JobsSubmitted = 1;
         private static final int JobStarted = 2;
         private static final int JobFinished = 3;
-        
+
+        @Override
+        public void initialPlanNotification(String id, MROperPlan plan) {
+            System.out.println("id: " + id + " planNodes: " + plan.getKeys().size());
+            assertNotNull(plan);
+        }
+
         @Override
         public void launchStartedNotification(String id, int numJobsToLaunch) {         
  
             System.out.println("id: " + id + " numJobsToLaunch: " + numJobsToLaunch);  



Mime
View raw message