pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r1360558 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java test/org/apache/pig/test/TestPigRunner.java
Date Thu, 12 Jul 2012 07:35:45 GMT
Author: daijy
Date: Thu Jul 12 07:35:44 2012
New Revision: 1360558

URL: http://svn.apache.org/viewvc?rev=1360558&view=rev
Log:
PIG-2780: MapReduceLauncher should break early when one of the jobs throws an exception

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/trunk/test/org/apache/pig/test/TestPigRunner.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1360558&r1=1360557&r2=1360558&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jul 12 07:35:44 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2780: MapReduceLauncher should break early when one of the jobs throws an exception (jay23jack
via daijy)
+
 PIG-2804: Remove "PIG" exec type (dvryaboy)
 
 PIG-2726: Handling legitimate NULL values in Cube operator (prasanth_j via dvryaboy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1360558&r1=1360557&r2=1360558&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Thu Jul 12 07:35:44 2012
@@ -275,6 +275,9 @@ public class MapReduceLauncher extends L
             //All the setup done, now lets launch the jobs.
             jcThread.start();
             
+            // a flag whether to warn failure during the loop below, so users can notice
failure earlier.
+            boolean warn_failure = true;
+            
             // Now wait, till we are finished.
             while(!jc.allFinished()){
 
@@ -321,7 +324,16 @@ public class MapReduceLauncher extends L
             	
             	// collect job stats by frequently polling of completed jobs (PIG-1829)
             	PigStatsUtil.accumulateStats(jc);
-            	       	
+            	
+                // if stop_on_failure is enabled, we need to stop immediately when any job
has failed
+                checkStopOnFailure(stop_on_failure);
+                // otherwise, we just display a warning message if there's any failure
+                if (warn_failure && !jc.getFailedJobs().isEmpty()) {
+                    // we don't warn again for this group of jobs
+                    warn_failure = false;
+                    log.warn("Ooops! Some job has failed! Specify -stop_on_failure if you
"
+                            + "want Pig to stop immediately on failure.");
+                }
             }
             
             //check for the jobControlException first
@@ -345,21 +357,8 @@ public class MapReduceLauncher extends L
             }
             
             if (!jc.getFailedJobs().isEmpty() ) {
-                if (stop_on_failure){
-                    int errCode = 6017;
-                    StringBuilder msg = new StringBuilder();
-                    
-                    for (int i=0; i<jc.getFailedJobs().size(); i++) {
-                        Job j = jc.getFailedJobs().get(i);
-                        msg.append(j.getMessage());
-                        if (i!=jc.getFailedJobs().size()-1) {
-                            msg.append("\n");
-                        }
-                    }
-                    
-                    throw new ExecException(msg.toString(), errCode,
-                            PigException.REMOTE_ENVIRONMENT);
-                }
+                // stop if stop_on_failure is enabled
+                checkStopOnFailure(stop_on_failure);
                 
                 // If we only have one store and that job fail, then we sure 
                 // that the job completely fail, and we shall stop dependent jobs
@@ -470,6 +469,32 @@ public class MapReduceLauncher extends L
         return PigStatsUtil.getPigStats(ret);
     }
 
+    /**
+     * If stop_on_failure is enabled and any job has failed, an ExecException is thrown.
+     * @param stop_on_failure whether it's enabled.
+     * @throws ExecException If stop_on_failure is enabled and any job is failed
+     */
+    private void checkStopOnFailure(boolean stop_on_failure) throws ExecException{
+    	if (jc.getFailedJobs().isEmpty())
+            return;
+    	
+    	if (stop_on_failure){
+            int errCode = 6017;
+            StringBuilder msg = new StringBuilder();
+            
+            for (int i=0; i<jc.getFailedJobs().size(); i++) {
+                Job j = jc.getFailedJobs().get(i);
+                msg.append(j.getMessage());
+                if (i!=jc.getFailedJobs().size()-1) {
+                    msg.append("\n");
+                }
+            }
+            
+            throw new ExecException(msg.toString(), errCode,
+                    PigException.REMOTE_ENVIRONMENT);
+        }
+    }
+    
     private String getStackStraceStr(Throwable e) {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);

Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1360558&r1=1360557&r2=1360558&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Thu Jul 12 07:35:44 2012
@@ -863,6 +863,49 @@ public class TestPigRunner {
         }
     }
     
+    /**
+     * PIG-2780: In this test case, Pig submits three jobs at the same time and
+     * one of them will fail due to nonexistent input file. If users enable
+     * stop.on.failure, then Pig should immediately stop if anyone of the three
+     * jobs has failed.
+     */
+    @Test
+    public void testStopOnFailure() throws Exception {
+        
+        PrintWriter w1 = new PrintWriter(new FileWriter(PIG_FILE));
+        w1.println("A1 = load '" + INPUT_FILE + "';");
+        w1.println("B1 = load 'nonexist';");
+        w1.println("C1 = load '" + INPUT_FILE + "';");
+        w1.println("A2 = distinct A1;");
+        w1.println("B2 = distinct B1;");
+        w1.println("C2 = distinct C1;");
+        w1.println("ret = union A2,B2,C2;");
+        w1.println("store ret into 'tmp/output';");
+        w1.close();
+        
+        try {
+            String[] args = { "-F", PIG_FILE };
+            PigStats stats = PigRunner.run(args, new TestNotificationListener());
+     
+            assertTrue(!stats.isSuccessful());
+            
+            int successfulJobs = 0;
+            Iterator<Operator> it = stats.getJobGraph().getOperators();
+            while (it.hasNext()){
+                JobStats js = (JobStats)it.next();
+                if (js.isSuccessful())
+                    successfulJobs++;
+            }
+            
+            // we should have less than 2 successful jobs
+            assertTrue("Should have less than 2 successful jobs", successfulJobs < 2);
+            
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+            Util.deleteFile(cluster, "tmp/output");
+        }
+    }
     public static class TestNotificationListener implements PigProgressNotificationListener
{
         
         private Map<String, int[]> numMap = new HashMap<String, int[]>();



Mime
View raw message