pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject svn commit: r1747366 - in /pig/trunk: ./ shims/src/hadoop23/org/apache/pig/backend/hadoop/ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executioneng...
Date Wed, 08 Jun 2016 09:24:21 GMT
Author: rohini
Date: Wed Jun  8 09:24:21 2016
New Revision: 1747366

URL: http://svn.apache.org/viewvc?rev=1747366&view=rev
Log:
PIG-4921: Kill running jobs on InterruptedException (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
    pig/trunk/src/org/apache/pig/Main.java
    pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
    pig/trunk/src/org/apache/pig/impl/PigImplConstants.java
    pig/trunk/src/org/apache/pig/impl/util/Utils.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1747366&r1=1747365&r2=1747366&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Jun  8 09:24:21 2016
@@ -30,6 +30,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4921: Kill running jobs on InterruptedException (rohini)
+
 PIG-4916: Pig on Tez fail to remove temporary HDFS files in some cases (daijy)
 
 Release 0.16.0 - Unreleased

Modified: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java?rev=1747366&r1=1747365&r2=1747366&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java (original)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java Wed Jun 
8 09:24:21 2016
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.client.api
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.ScriptState;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -67,14 +68,14 @@ public class PigATSClient {
             timelineClient.init(yarnConf);
             timelineClient.start();
         }
-        Runtime.getRuntime().addShutdownHook(new Thread() {
+        Utils.addShutdownHookWithPriority(new Runnable() {
             @Override
             public void run() {
                 timelineClient.stop();
                 executor.shutdownNow();
                 executor = null;
             }
-        });
+        }, PigImplConstants.SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY);
         log.info("Created ATS Hook");
     }
 

Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1747366&r1=1747365&r2=1747366&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Wed Jun  8 09:24:21 2016
@@ -53,14 +53,13 @@ import org.antlr.runtime.RecognitionExce
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.classification.InterfaceAudience;
@@ -102,13 +101,12 @@ import com.google.common.io.Closeables;
 public class Main {
 
     static {
-
-        ShutdownHookManager.get().addShutdownHook(new Runnable() {
+        Utils.addShutdownHookWithPriority(new Runnable() {
             @Override
             public void run() {
                 FileLocalizer.deleteTempResourceFiles();
             }
-        }, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1);
+        }, PigImplConstants.SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY);
     }
 
     private final static Log log = LogFactory.getLog(Main.class);
@@ -662,6 +660,7 @@ public class Main {
             if(!gruntCalled) {
                 LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
             }
+            killRunningJobsIfInterrupted(e, pigContext);
         } catch (Throwable e) {
             rc = ReturnCode.THROWABLE_EXCEPTION;
             PigStatsUtil.setErrorMessage(e.getMessage());
@@ -670,6 +669,7 @@ public class Main {
             if(!gruntCalled) {
                 LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
             }
+            killRunningJobsIfInterrupted(e, pigContext);
         } finally {
             if (printScriptRunTime) {
                 printScriptRunTime(startTime);
@@ -696,6 +696,22 @@ public class Main {
                 + " (" + duration.getMillis() + " ms)");
     }
 
+    private static void killRunningJobsIfInterrupted(Throwable e, PigContext pigContext)
{
+        Throwable cause = e.getCause();
+        // Kill running job when we get InterruptedException
+        // Pig thread is interrupted by mapreduce when Oozie launcher job is killed
+        // Shutdown hook kills running jobs, but sometimes NodeManager can issue
+        // a SIGKILL after AM unregisters and before shutdown hook gets to execute
+        // causing orphaned jobs that continue to run.
+        if (e instanceof InterruptedException || (cause != null && cause instanceof
InterruptedException)) {
+            try {
+                pigContext.getExecutionEngine().kill();
+            } catch (BackendException be) {
+                log.error("Error while killing running jobs", be);
+            }
+        }
+    }
+
     protected static PigProgressNotificationListener makeListener(Properties properties)
{
 
         try {

Modified: pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=1747366&r1=1747365&r2=1747366&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Wed Jun  8 09:24:21
2016
@@ -183,6 +183,14 @@ public interface ExecutionEngine {
     public ExecutableManager getExecutableManager();
 
     /**
+     * This method is called when user requests to kill all jobs
+     * associated with the execution engine
+     *
+     * @throws BackendException
+     */
+    public void kill() throws BackendException;
+
+    /**
      * This method is called when a user requests to kill a job associated with
      * the given job id. If it is not possible for a user to kill a job, throw a
      * exception. It is imperative for the job id's being displayed to be unique

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1747366&r1=1747365&r2=1747366&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed
Jun  8 09:24:21 2016
@@ -360,6 +360,13 @@ public abstract class HExecutionEngine i
     }
 
     @Override
+    public void kill() throws BackendException {
+        if (launcher != null) {
+            launcher.kill();
+        }
+    }
+
+    @Override
     public void killJob(String jobID) throws BackendException {
         if (launcher != null) {
             launcher.killJob(jobID, getJobConf());

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1747366&r1=1747365&r2=1747366&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Wed Jun  8 09:24:21
2016
@@ -76,7 +76,7 @@ public abstract class Launcher {
     protected Map<FileSpec, Exception> failureMap;
     protected JobControl jc = null;
 
-    class HangingJobKiller extends Thread {
+    protected class HangingJobKiller extends Thread {
         public HangingJobKiller() {}
 
         @Override
@@ -90,7 +90,6 @@ public abstract class Launcher {
     }
 
     protected Launcher() {
-        Runtime.getRuntime().addShutdownHook(new HangingJobKiller());
         // handle the windows portion of \r
         if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
             newLine = "\r\n";
@@ -104,7 +103,6 @@ public abstract class Launcher {
     public void reset() {
         failureMap = Maps.newHashMap();
         totalHadoopTimeSpent = 0;
-        jc = null;
     }
 
     /**

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1747366&r1=1747365&r2=1747366&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Wed Jun  8 09:24:21 2016
@@ -19,7 +19,9 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -65,6 +67,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -86,7 +89,7 @@ import org.apache.pig.tools.pigstats.map
  * Main class that launches pig for Map Reduce
  *
  */
-public class MapReduceLauncher extends Launcher{
+public class MapReduceLauncher extends Launcher {
 
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
 
@@ -94,14 +97,23 @@ public class MapReduceLauncher extends L
 
     private boolean aggregateWarning = false;
 
+    public MapReduceLauncher() {
+        super();
+        Utils.addShutdownHookWithPriority(new HangingJobKiller(),
+                PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
+    }
+
     @Override
     public void kill() {
         try {
-            log.debug("Receive kill signal");
+            log.info("Received kill signal");
             if (jc!=null) {
                 for (Job job : jc.getRunningJobs()) {
                     HadoopShims.killJob(job);
                     log.info("Job " + job.getAssignedJobID() + " killed");
+                    String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+                            .format(Calendar.getInstance().getTime());
+                    System.err.println(timeStamp + " Job " + job.getAssignedJobID() + " killed");
                 }
             }
         } catch (Exception e) {
@@ -301,8 +313,7 @@ public class MapReduceLauncher extends L
                 // Now wait, till we are finished.
                 while(!jc.allFinished()){
 
-                    try { jcThread.join(sleepTime); }
-                    catch (InterruptedException e) {}
+                    jcThread.join(sleepTime);
 
                     List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>();
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1747366&r1=1747365&r2=1747366&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
Wed Jun  8 09:24:21 2016
@@ -18,7 +18,9 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.IOException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.tez.client.TezAppMasterStatus;
@@ -46,13 +49,13 @@ public class TezSessionManager {
     private static final Log log = LogFactory.getLog(TezSessionManager.class);
 
     static {
-        Runtime.getRuntime().addShutdownHook(new Thread() {
+        Utils.addShutdownHookWithPriority(new Runnable() {
 
             @Override
             public void run() {
                 TezSessionManager.shutdown();
             }
-        });
+        }, PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
     }
 
     private static ReentrantReadWriteLock sessionPoolLock = new ReentrantReadWriteLock();
@@ -270,6 +273,11 @@ public class TezSessionManager {
                 synchronized (sessionInfo) {
                     if (sessionInfo.session == session) {
                         log.info("Stopping Tez session " + session);
+                        String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+                                    .format(Calendar.getInstance().getTime());
+                        System.err.println(timeStamp + " Shutting down Tez session "
+                                + ", sessionName=" + session.getClientName()
+                                + ", applicationId=" + session.getAppMasterApplicationId());
                         session.stop();
                         sessionToRemove = sessionInfo;
                         break;
@@ -296,19 +304,30 @@ public class TezSessionManager {
             shutdown = true;
             for (SessionInfo sessionInfo : sessionPool) {
                 synchronized (sessionInfo) {
+                    TezClient session = sessionInfo.session;
                     try {
-                        if (sessionInfo.session.getAppMasterStatus().equals(
+                        String timeStamp = new SimpleDateFormat(
+                                "yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
+                        if (session.getAppMasterStatus().equals(
                                 TezAppMasterStatus.SHUTDOWN)) {
                             log.info("Tez session is already shutdown "
-                                    + sessionInfo.session);
+                                    + session);
+                            System.err.println(timeStamp
+                                    + " Tez session is already shutdown " + session
+                                    + ", sessionName=" + session.getClientName()
+                                    + ", applicationId=" + session.getAppMasterApplicationId());
                             continue;
                         }
-                        log.info("Shutting down Tez session "
-                                + sessionInfo.session);
-                        sessionInfo.session.stop();
+                        log.info("Shutting down Tez session " + session);
+                        // Since hadoop calls org.apache.log4j.LogManager.shutdown();
+                        // the log.info message is not displayed with shutdown hook in Oozie
+                        System.err.println(timeStamp + " Shutting down Tez session "
+                                + ", sessionName=" + session.getClientName()
+                                + ", applicationId=" + session.getAppMasterApplicationId());
+                        session.stop();
                     } catch (Exception e) {
                         log.error("Error shutting down Tez session "
-                                + sessionInfo.session, e);
+                                + session, e);
                     }
                 }
             }

Modified: pig/trunk/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigImplConstants.java?rev=1747366&r1=1747365&r2=1747366&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigImplConstants.java Wed Jun  8 09:24:21 2016
@@ -84,4 +84,9 @@ public class PigImplConstants {
      * A unique id for a Pig session used as callerId for underlining component
      */
     public static final String PIG_AUDIT_ID = "pig.script.id";
+
+    // Kill the jobs before cleaning up tmp files
+    public static int SHUTDOWN_HOOK_JOB_KILL_PRIORITY = 3;
+    public static int SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY = 2;
+    public static int SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY = 1;
 }

Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1747366&r1=1747365&r2=1747366&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Wed Jun  8 09:24:21 2016
@@ -48,6 +48,7 @@ import org.apache.hadoop.io.compress.BZi
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.pig.FileInputLoadFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
@@ -708,4 +709,15 @@ public class Utils {
             DateTimeZone.setDefault(DateTimeZone.forID(dtzStr));
         }
     }
+
+    /**
+     * Add shutdown hook that runs before the FileSystem cache shutdown happens.
+     *
+     * @param hook code to execute during shutdown
+     * @param priority Priority over the  FileSystem.SHUTDOWN_HOOK_PRIORITY
+     */
+    public static void addShutdownHookWithPriority(Runnable hook, int priority) {
+        ShutdownHookManager.get().addShutdownHook(hook,
+                FileSystem.SHUTDOWN_HOOK_PRIORITY + priority);
+    }
 }



Mime
View raw message