airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From saguz...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1109] Use kill signal to kill processes and log results
Date Fri, 14 Apr 2017 01:52:49 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 8f9f5084b -> 9bdfb8c70


[AIRFLOW-1109] Use kill signal to kill processes and log results

The kill_process_tree function comments state that
it uses SIGKILL when
it uses SIGTERM. We should update this to be
correct as well as log
results.

Closes #2241 from saguziel/aguziel-kill-processes


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9bdfb8c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9bdfb8c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9bdfb8c7

Branch: refs/heads/master
Commit: 9bdfb8c700a2622aad26422a34d24e252be52bff
Parents: 8f9f508
Author: Alex Guziel <alex.guziel@airbnb.com>
Authored: Thu Apr 13 18:52:43 2017 -0700
Committer: Alex Guziel <alex.guziel@airbnb.com>
Committed: Thu Apr 13 18:52:43 2017 -0700

----------------------------------------------------------------------
 airflow/utils/helpers.py | 140 +++++++++++++-----------------------------
 tests/utils/helpers.py   |  84 +++++++++++++++++++++++++
 2 files changed, 126 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9bdfb8c7/airflow/utils/helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index dee0657..9a94125 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -35,7 +35,7 @@ from airflow.exceptions import AirflowException
 
 # When killing processes, time to wait after issuing a SIGTERM before issuing a
 # SIGKILL.
-TIME_TO_WAIT_AFTER_SIGTERM = 5
+DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = 5
 
 
 def validate_key(k, max_length=250):
@@ -182,20 +182,32 @@ def pprinttable(rows):
     return s
 
 
-def kill_using_shell(pid, signal=signal.SIGTERM):
-    process = psutil.Process(pid)
-    # Use sudo only when necessary - consider SubDagOperator and SequentialExecutor case.
-    if process.username() != getpass.getuser():
-        args = ["sudo", "kill", "-{}".format(int(signal)), str(pid)]
-    else:
-        args = ["kill", "-{}".format(int(signal)), str(pid)]
-    # PID may not exist and return a non-zero error code
-    subprocess.call(args)
+def kill_using_shell(logger, pid, signal=signal.SIGTERM):
+    try:
+        process = psutil.Process(pid)
+        # Use sudo only when necessary - consider SubDagOperator and SequentialExecutor case.
+        if process.username() != getpass.getuser():
+            args = ["sudo", "kill", "-{}".format(int(signal)), str(pid)]
+        else:
+            args = ["kill", "-{}".format(int(signal)), str(pid)]
+        # PID may not exist and return a non-zero error code
+        logger.error(subprocess.check_output(args))
+        logger.info("Killed process {} with signal {}".format(pid, signal))
+        return True
+    except psutil.NoSuchProcess as e:
+        logger.warning("Process {} no longer exists".format(pid))
+        return False
+    except subprocess.CalledProcessError as e:
+        logger.warning("Failed to kill process {} with signal {}. Output: {}"
+                       .format(pid, signal, e.output))
+        return False
 
 
-def kill_process_tree(logger, pid):
+def kill_process_tree(logger, pid, timeout=DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM):
     """
-    Kills the process and all of the descendants. Kills using the `kill`
+    TODO(saguziel): also kill the root process after killing descendants
+  
+    Kills the process's descendants. Kills using the `kill`
     shell command so that it can change users. Note: killing via PIDs
     has the potential to the wrong process if the process dies and the
     PID gets recycled in a narrow time window.
@@ -215,23 +227,21 @@ def kill_process_tree(logger, pid):
                             if x.is_running()]
 
     if len(descendant_processes) != 0:
-        logger.warning("Terminating descendant processes of {} PID: {}"
-                       .format(root_process.cmdline(),
-                               root_process.pid))
+        logger.info("Terminating descendant processes of {} PID: {}"
+                    .format(root_process.cmdline(),
+                            root_process.pid))
         temp_processes = descendant_processes[:]
         for descendant in temp_processes:
-            logger.warning("Terminating descendant process {} PID: {}"
-                           .format(descendant.cmdline(), descendant.pid))
-            try:
-                kill_using_shell(descendant.pid, signal.SIGTERM)
-            except psutil.NoSuchProcess:
+            logger.info("Terminating descendant process {} PID: {}"
+                        .format(descendant.cmdline(), descendant.pid))
+            if not kill_using_shell(logger, descendant.pid, signal.SIGTERM):
                 descendant_processes.remove(descendant)
 
-        logger.warning("Waiting up to {}s for processes to exit..."
-                       .format(TIME_TO_WAIT_AFTER_SIGTERM))
+        logger.info("Waiting up to {}s for processes to exit..."
+                    .format(timeout))
         try:
-            psutil.wait_procs(descendant_processes, TIME_TO_WAIT_AFTER_SIGTERM)
-            logger.warning("Done waiting")
+            psutil.wait_procs(descendant_processes, timeout)
+            logger.info("Done waiting")
         except psutil.TimeoutExpired:
             logger.warning("Ran out of time while waiting for "
                            "processes to exit")
@@ -242,85 +252,19 @@ def kill_process_tree(logger, pid):
         if len(descendant_processes) > 0:
             temp_processes = descendant_processes[:]
             for descendant in temp_processes:
-                logger.warning("Killing descendant process {} PID: {}"
-                               .format(descendant.cmdline(), descendant.pid))
-                try:
-                    kill_using_shell(descendant.pid, signal.SIGTERM)
-                    descendant.wait()
-                except psutil.NoSuchProcess:
+                logger.info("Killing descendant process {} PID: {}"
+                            .format(descendant.cmdline(), descendant.pid))
+                if not kill_using_shell(logger, descendant.pid, signal.SIGKILL):
                     descendant_processes.remove(descendant)
-            logger.warning("Killed all descendant processes of {} PID: {}"
-                           .format(root_process.cmdline(),
-                                   root_process.pid))
+                else:
+                    descendant.wait()
+            logger.info("Killed all descendant processes of {} PID: {}"
+                        .format(root_process.cmdline(),
+                                root_process.pid))
     else:
         logger.debug("There are no descendant processes to kill")
 
 
-def kill_descendant_processes(logger, pids_to_kill=None):
-    """
-    Kills all descendant processes of this process.
-
-    :param logger: logger
-    :type logger: logging.Logger
-    :param pids_to_kill: if specified, kill only these PIDs
-    :type pids_to_kill: list[int]
-    """
-    # First try SIGTERM
-    this_process = psutil.Process(os.getpid())
-
-    # Only check child processes to ensure that we don't have a case
-    # where a child process died but the PID got reused.
-    descendant_processes = [x for x in this_process.children(recursive=True)
-                            if x.is_running()]
-    if pids_to_kill:
-        descendant_processes = [x for x in descendant_processes
-                                if x.pid in pids_to_kill]
-
-    if len(descendant_processes) == 0:
-        logger.debug("There are no descendant processes that can be killed")
-        return
-    logger.warning("Terminating descendant processes of {} PID: {}"
-                   .format(this_process.cmdline(),
-                           this_process.pid))
-
-    temp_processes = descendant_processes[:]
-    for descendant in temp_processes:
-        try:
-            logger.warning("Terminating descendant process {} PID: {}"
-                           .format(descendant.cmdline(), descendant.pid))
-            descendant.terminate()
-        except psutil.NoSuchProcess:
-            descendant_processes.remove(descendant)
-
-    logger.warning("Waiting up to {}s for processes to exit..."
-                   .format(TIME_TO_WAIT_AFTER_SIGTERM))
-    try:
-        psutil.wait_procs(descendant_processes, TIME_TO_WAIT_AFTER_SIGTERM)
-        logger.warning("Done waiting")
-    except psutil.TimeoutExpired:
-        logger.warning("Ran out of time while waiting for "
-                       "processes to exit")
-    # Then SIGKILL
-    descendant_processes = [x for x in this_process.children(recursive=True)
-                            if x.is_running()]
-    if pids_to_kill:
-        descendant_processes = [x for x in descendant_processes
-                                if x.pid in pids_to_kill]
-
-    if len(descendant_processes) > 0:
-        for descendant in descendant_processes:
-            logger.warning("Killing descendant process {} PID: {}"
-                           .format(descendant.cmdline(), descendant.pid))
-            try:
-                descendant.kill()
-                descendant.wait()
-            except psutil.NoSuchProcess:
-                pass
-        logger.warning("Killed all descendant processes of {} PID: {}"
-                       .format(this_process.cmdline(),
-                               this_process.pid))
-
-
 class AirflowImporter(object):
     """
     Importer that dynamically loads a class and module from its parent. This

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9bdfb8c7/tests/utils/helpers.py
----------------------------------------------------------------------
diff --git a/tests/utils/helpers.py b/tests/utils/helpers.py
new file mode 100644
index 0000000..3ef43f8
--- /dev/null
+++ b/tests/utils/helpers.py
@@ -0,0 +1,84 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import logging
+import multiprocessing
+import unittest
+import psutil
+import signal
+import time
+
+from airflow.utils import helpers
+
+
+class TestHelpers(unittest.TestCase):
+    @staticmethod
+    def _ignores_sigterm(child_pid, setup_done):
+        def signal_handler(signum, frame):
+            pass
+        signal.signal(signal.SIGTERM, signal_handler)
+        child_pid.value = os.getpid()
+        setup_done.release()
+        while True:
+            time.sleep(1)
+
+    @staticmethod
+    def _parent_of_ignores_sigterm(child_process_killed, child_pid,
+                                   process_done, setup_done):
+        child = multiprocessing.Process(target=TestHelpers._ignores_sigterm,
+                                        args=[child_pid, setup_done])
+        child.start()
+        if setup_done.acquire(timeout=1.0):
+            helpers.kill_process_tree(logging.getLogger(), os.getpid(), timeout=1.0)
+            # Process.is_alive doesnt work with SIGKILL
+            if not psutil.pid_exists(child_pid.value):
+                child_process_killed.value = 1
+        process_done.release()
+
+    def test_kill_process_tree(self):
+        """ Spin up a process that can't be killed by SIGTERM and make sure it gets killed
anyway. """
+        child_process_killed = multiprocessing.Value('i', 0)
+        process_done = multiprocessing.Semaphore(0)
+        child_pid = multiprocessing.Value('i', 0)
+        setup_done = multiprocessing.Semaphore(0)
+        args = [child_process_killed, child_pid, process_done, setup_done]
+        child = multiprocessing.Process(target=TestHelpers._parent_of_ignores_sigterm, args=args)
+        try:
+            child.start()
+            self.assertTrue(process_done.acquire(timeout=5.0))
+            self.assertEqual(1, child_process_killed.value)
+        finally:
+            try:
+                os.kill(child_pid.value, signal.SIGKILL) # terminate doesnt work here
+            except OSError:
+                pass
+            child.terminate()
+
+    def test_kill_using_shell(self):
+        """ Test when no process exists. """
+        child_pid = multiprocessing.Value('i', 0)
+        setup_done = multiprocessing.Semaphore(0)
+        args = [child_pid, setup_done]
+        child = multiprocessing.Process(target=TestHelpers._ignores_sigterm, args=args)
+        child.start()
+
+        self.assertTrue(setup_done.acquire(timeout=1.0))
+        pid_to_kill = child_pid.value
+        self.assertTrue(helpers.kill_using_shell(logging.getLogger(), pid_to_kill,
+                                                 signal=signal.SIGKILL))
+        child.join() # remove orphan process
+        self.assertFalse(helpers.kill_using_shell(logging.getLogger(), pid_to_kill,
+                                                  signal=signal.SIGKILL))
+    


Mime
View raw message