Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3756B200B25 for ; Wed, 8 Jun 2016 11:24:32 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 35E30160A2E; Wed, 8 Jun 2016 09:24:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0C853160A29 for ; Wed, 8 Jun 2016 11:24:30 +0200 (CEST) Received: (qmail 77346 invoked by uid 500); 8 Jun 2016 09:24:30 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 77330 invoked by uid 99); 8 Jun 2016 09:24:30 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jun 2016 09:24:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id C557A180219 for ; Wed, 8 Jun 2016 09:24:29 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.374 X-Spam-Level: X-Spam-Status: No, score=0.374 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 1BXi8MsMZ4p4 for ; Wed, 8 Jun 2016 09:24:24 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTP id E04895FB23 for ; Wed, 8 Jun 2016 09:24:23 +0000 (UTC) Received: from svn01-us-west.apache.org (svn.apache.org [10.41.0.6]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id C4403E00E7 for ; Wed, 8 Jun 2016 09:24:22 +0000 (UTC) Received: from svn01-us-west.apache.org (localhost [127.0.0.1]) by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id 389733A0287 for ; Wed, 8 Jun 2016 09:24:22 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: svn commit: r1747366 - in /pig/trunk: ./ shims/src/hadoop23/org/apache/pig/backend/hadoop/ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executioneng... Date: Wed, 08 Jun 2016 09:24:21 -0000 To: commits@pig.apache.org From: rohini@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20160608092422.389733A0287@svn01-us-west.apache.org> archived-at: Wed, 08 Jun 2016 09:24:32 -0000 Author: rohini Date: Wed Jun 8 09:24:21 2016 New Revision: 1747366 URL: http://svn.apache.org/viewvc?rev=1747366&view=rev Log: PIG-4921: Kill running jobs on InterruptedException (rohini) Modified: pig/trunk/CHANGES.txt pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java pig/trunk/src/org/apache/pig/Main.java pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java pig/trunk/src/org/apache/pig/impl/PigImplConstants.java pig/trunk/src/org/apache/pig/impl/util/Utils.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1747366&r1=1747365&r2=1747366&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Wed Jun 8 09:24:21 2016 @@ -30,6 +30,8 @@ OPTIMIZATIONS   BUG FIXES +PIG-4921: Kill running jobs on InterruptedException (rohini) + PIG-4916: Pig on Tez fail to remove temporary HDFS files in some cases (daijy) Release 0.16.0 - Unreleased Modified: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java?rev=1747366&r1=1747365&r2=1747366&view=diff ============================================================================== --- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java (original) +++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/PigATSClient.java Wed Jun 8 09:24:21 2016 @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.client.api import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.PigImplConstants; +import org.apache.pig.impl.util.Utils; import org.apache.pig.tools.pigstats.ScriptState; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -67,14 +68,14 @@ public class PigATSClient { timelineClient.init(yarnConf); timelineClient.start(); } - Runtime.getRuntime().addShutdownHook(new Thread() { + Utils.addShutdownHookWithPriority(new Runnable() { @Override public void run() { timelineClient.stop(); executor.shutdownNow(); executor = null; } - }); + }, PigImplConstants.SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY); log.info("Created ATS Hook"); } Modified: pig/trunk/src/org/apache/pig/Main.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1747366&r1=1747365&r2=1747366&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/Main.java (original) +++ pig/trunk/src/org/apache/pig/Main.java Wed Jun 8 09:24:21 2016 @@ -53,14 +53,13 @@ import org.antlr.runtime.RecognitionExce import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.util.ShutdownHookManager; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import org.apache.pig.PigRunner.ReturnCode; +import org.apache.pig.backend.BackendException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.classification.InterfaceAudience; @@ -102,13 +101,12 @@ import com.google.common.io.Closeables; public class Main { static { - - ShutdownHookManager.get().addShutdownHook(new Runnable() { + Utils.addShutdownHookWithPriority(new Runnable() { @Override public void run() { FileLocalizer.deleteTempResourceFiles(); } - }, FileSystem.SHUTDOWN_HOOK_PRIORITY + 1); + }, PigImplConstants.SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY); } private final static Log log = LogFactory.getLog(Main.class); @@ -662,6 +660,7 @@ public class Main { if(!gruntCalled) { LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched"); } + killRunningJobsIfInterrupted(e, pigContext); } catch (Throwable e) { rc = ReturnCode.THROWABLE_EXCEPTION; PigStatsUtil.setErrorMessage(e.getMessage()); @@ -670,6 +669,7 @@ public class Main { if(!gruntCalled) { LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched"); } + killRunningJobsIfInterrupted(e, pigContext); } finally { if (printScriptRunTime) { printScriptRunTime(startTime); @@ -696,6 +696,22 @@ public class Main { + " (" + duration.getMillis() + " ms)"); } + private static void killRunningJobsIfInterrupted(Throwable e, PigContext pigContext) { + Throwable cause = e.getCause(); + // Kill running job when we get InterruptedException + // Pig thread is interrupted by mapreduce when Oozie launcher job is killed + // Shutdown hook kills running jobs, but sometimes NodeManager can issue + // a SIGKILL after AM unregisters and before shutdown hook gets to execute + // causing orphaned jobs that continue to run. + if (e instanceof InterruptedException || (cause != null && cause instanceof InterruptedException)) { + try { + pigContext.getExecutionEngine().kill(); + } catch (BackendException be) { + log.error("Error while killing running jobs", be); + } + } + } + protected static PigProgressNotificationListener makeListener(Properties properties) { try { Modified: pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=1747366&r1=1747365&r2=1747366&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original) +++ pig/trunk/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Wed Jun 8 09:24:21 2016 @@ -183,6 +183,14 @@ public interface ExecutionEngine { public ExecutableManager getExecutableManager(); /** + * This method is called when user requests to kill all jobs + * associated with the execution engine + * + * @throws BackendException + */ + public void kill() throws BackendException; + + /** * This method is called when a user requests to kill a job associated with * the given job id. If it is not possible for a user to kill a job, throw a * exception. It is imperative for the job id's being displayed to be unique Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1747366&r1=1747365&r2=1747366&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Jun 8 09:24:21 2016 @@ -360,6 +360,13 @@ public abstract class HExecutionEngine i } @Override + public void kill() throws BackendException { + if (launcher != null) { + launcher.kill(); + } + } + + @Override public void killJob(String jobID) throws BackendException { if (launcher != null) { launcher.killJob(jobID, getJobConf()); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1747366&r1=1747365&r2=1747366&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Wed Jun 8 09:24:21 2016 @@ -76,7 +76,7 @@ public abstract class Launcher { protected Map failureMap; protected JobControl jc = null; - class HangingJobKiller extends Thread { + protected class HangingJobKiller extends Thread { public HangingJobKiller() {} @Override @@ -90,7 +90,6 @@ public abstract class Launcher { } protected Launcher() { - Runtime.getRuntime().addShutdownHook(new HangingJobKiller()); // handle the windows portion of \r if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) { newLine = "\r\n"; @@ -104,7 +103,6 @@ public abstract class Launcher { public void reset() { failureMap = Maps.newHashMap(); totalHadoopTimeSpent = 0; - jc = null; } /** Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1747366&r1=1747365&r2=1747366&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Jun 8 09:24:21 2016 @@ -19,7 +19,9 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.io.PrintStream; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Calendar; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -65,6 +67,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.CompilationMessageCollector; @@ -86,7 +89,7 @@ import org.apache.pig.tools.pigstats.map * Main class that launches pig for Map Reduce * */ -public class MapReduceLauncher extends Launcher{ +public class MapReduceLauncher extends Launcher { public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; @@ -94,14 +97,23 @@ public class MapReduceLauncher extends L private boolean aggregateWarning = false; + public MapReduceLauncher() { + super(); + Utils.addShutdownHookWithPriority(new HangingJobKiller(), + PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY); + } + @Override public void kill() { try { - log.debug("Receive kill signal"); + log.info("Received kill signal"); if (jc!=null) { for (Job job : jc.getRunningJobs()) { HadoopShims.killJob(job); log.info("Job " + job.getAssignedJobID() + " killed"); + String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + .format(Calendar.getInstance().getTime()); + System.err.println(timeStamp + " Job " + job.getAssignedJobID() + " killed"); } } } catch (Exception e) { @@ -301,8 +313,7 @@ public class MapReduceLauncher extends L // Now wait, till we are finished. while(!jc.allFinished()){ - try { jcThread.join(sleepTime); } - catch (InterruptedException e) {} + jcThread.join(sleepTime); List jobsAssignedIdInThisRun = new ArrayList(); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1747366&r1=1747365&r2=1747366&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Wed Jun 8 09:24:21 2016 @@ -18,7 +18,9 @@ package org.apache.pig.backend.hadoop.executionengine.tez; import java.io.IOException; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Calendar; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig; import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.util.Utils; import org.apache.pig.tools.pigstats.tez.TezScriptState; import org.apache.tez.client.TezAppMasterStatus; @@ -46,13 +49,13 @@ public class TezSessionManager { private static final Log log = LogFactory.getLog(TezSessionManager.class); static { - Runtime.getRuntime().addShutdownHook(new Thread() { + Utils.addShutdownHookWithPriority(new Runnable() { @Override public void run() { TezSessionManager.shutdown(); } - }); + }, PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY); } private static ReentrantReadWriteLock sessionPoolLock = new ReentrantReadWriteLock(); @@ -270,6 +273,11 @@ public class TezSessionManager { synchronized (sessionInfo) { if (sessionInfo.session == session) { log.info("Stopping Tez session " + session); + String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + .format(Calendar.getInstance().getTime()); + System.err.println(timeStamp + " Shutting down Tez session " + + ", sessionName=" + session.getClientName() + + ", applicationId=" + session.getAppMasterApplicationId()); session.stop(); sessionToRemove = sessionInfo; break; @@ -296,19 +304,30 @@ public class TezSessionManager { shutdown = true; for (SessionInfo sessionInfo : sessionPool) { synchronized (sessionInfo) { + TezClient session = sessionInfo.session; try { - if (sessionInfo.session.getAppMasterStatus().equals( + String timeStamp = new SimpleDateFormat( + "yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); + if (session.getAppMasterStatus().equals( TezAppMasterStatus.SHUTDOWN)) { log.info("Tez session is already shutdown " - + sessionInfo.session); + + session); + System.err.println(timeStamp + + " Tez session is already shutdown " + session + + ", sessionName=" + session.getClientName() + + ", applicationId=" + session.getAppMasterApplicationId()); continue; } - log.info("Shutting down Tez session " - + sessionInfo.session); - sessionInfo.session.stop(); + log.info("Shutting down Tez session " + session); + // Since hadoop calls org.apache.log4j.LogManager.shutdown(); + // the log.info message is not displayed with shutdown hook in Oozie + System.err.println(timeStamp + " Shutting down Tez session " + + ", sessionName=" + session.getClientName() + + ", applicationId=" + session.getAppMasterApplicationId()); + session.stop(); } catch (Exception e) { log.error("Error shutting down Tez session " - + sessionInfo.session, e); + + session, e); } } } Modified: pig/trunk/src/org/apache/pig/impl/PigImplConstants.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigImplConstants.java?rev=1747366&r1=1747365&r2=1747366&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/PigImplConstants.java (original) +++ pig/trunk/src/org/apache/pig/impl/PigImplConstants.java Wed Jun 8 09:24:21 2016 @@ -84,4 +84,9 @@ public class PigImplConstants { * A unique id for a Pig session used as callerId for underlining component */ public static final String PIG_AUDIT_ID = "pig.script.id"; + + // Kill the jobs before cleaning up tmp files + public static int SHUTDOWN_HOOK_JOB_KILL_PRIORITY = 3; + public static int SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY = 2; + public static int SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY = 1; } Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1747366&r1=1747365&r2=1747366&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original) +++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Wed Jun 8 09:24:21 2016 @@ -48,6 +48,7 @@ import org.apache.hadoop.io.compress.BZi import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.ShutdownHookManager; import org.apache.pig.FileInputLoadFunc; import org.apache.pig.FuncSpec; import org.apache.pig.LoadFunc; @@ -708,4 +709,15 @@ public class Utils { DateTimeZone.setDefault(DateTimeZone.forID(dtzStr)); } } + + /** + * Add shutdown hook that runs before the FileSystem cache shutdown happens. + * + * @param hook code to execute during shutdown + * @param priority Priority over the FileSystem.SHUTDOWN_HOOK_PRIORITY + */ + public static void addShutdownHookWithPriority(Runnable hook, int priority) { + ShutdownHookManager.get().addShutdownHook(hook, + FileSystem.SHUTDOWN_HOOK_PRIORITY + priority); + } }