Repository: incubator-airflow
Updated Branches:
refs/heads/master 8f9f5084b -> 9bdfb8c70
[AIRFLOW-1109] Use kill signal to kill processes and log results
The kill_process_tree function comments state that
it uses SIGKILL when
it uses SIGTERM. We should update this to be
correct as well as log
results.
Closes #2241 from saguziel/aguziel-kill-processes
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9bdfb8c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9bdfb8c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9bdfb8c7
Branch: refs/heads/master
Commit: 9bdfb8c700a2622aad26422a34d24e252be52bff
Parents: 8f9f508
Author: Alex Guziel <alex.guziel@airbnb.com>
Authored: Thu Apr 13 18:52:43 2017 -0700
Committer: Alex Guziel <alex.guziel@airbnb.com>
Committed: Thu Apr 13 18:52:43 2017 -0700
----------------------------------------------------------------------
airflow/utils/helpers.py | 140 +++++++++++++-----------------------------
tests/utils/helpers.py | 84 +++++++++++++++++++++++++
2 files changed, 126 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9bdfb8c7/airflow/utils/helpers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index dee0657..9a94125 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -35,7 +35,7 @@ from airflow.exceptions import AirflowException
# When killing processes, time to wait after issuing a SIGTERM before issuing a
# SIGKILL.
-TIME_TO_WAIT_AFTER_SIGTERM = 5
+DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = 5
def validate_key(k, max_length=250):
@@ -182,20 +182,32 @@ def pprinttable(rows):
return s
-def kill_using_shell(pid, signal=signal.SIGTERM):
- process = psutil.Process(pid)
- # Use sudo only when necessary - consider SubDagOperator and SequentialExecutor case.
- if process.username() != getpass.getuser():
- args = ["sudo", "kill", "-{}".format(int(signal)), str(pid)]
- else:
- args = ["kill", "-{}".format(int(signal)), str(pid)]
- # PID may not exist and return a non-zero error code
- subprocess.call(args)
+def kill_using_shell(logger, pid, signal=signal.SIGTERM):
+ try:
+ process = psutil.Process(pid)
+ # Use sudo only when necessary - consider SubDagOperator and SequentialExecutor case.
+ if process.username() != getpass.getuser():
+ args = ["sudo", "kill", "-{}".format(int(signal)), str(pid)]
+ else:
+ args = ["kill", "-{}".format(int(signal)), str(pid)]
+ # PID may not exist and return a non-zero error code
+ logger.error(subprocess.check_output(args))
+ logger.info("Killed process {} with signal {}".format(pid, signal))
+ return True
+ except psutil.NoSuchProcess as e:
+ logger.warning("Process {} no longer exists".format(pid))
+ return False
+ except subprocess.CalledProcessError as e:
+ logger.warning("Failed to kill process {} with signal {}. Output: {}"
+ .format(pid, signal, e.output))
+ return False
-def kill_process_tree(logger, pid):
+def kill_process_tree(logger, pid, timeout=DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM):
"""
- Kills the process and all of the descendants. Kills using the `kill`
+ TODO(saguziel): also kill the root process after killing descendants
+
+ Kills the process's descendants. Kills using the `kill`
shell command so that it can change users. Note: killing via PIDs
has the potential to the wrong process if the process dies and the
PID gets recycled in a narrow time window.
@@ -215,23 +227,21 @@ def kill_process_tree(logger, pid):
if x.is_running()]
if len(descendant_processes) != 0:
- logger.warning("Terminating descendant processes of {} PID: {}"
- .format(root_process.cmdline(),
- root_process.pid))
+ logger.info("Terminating descendant processes of {} PID: {}"
+ .format(root_process.cmdline(),
+ root_process.pid))
temp_processes = descendant_processes[:]
for descendant in temp_processes:
- logger.warning("Terminating descendant process {} PID: {}"
- .format(descendant.cmdline(), descendant.pid))
- try:
- kill_using_shell(descendant.pid, signal.SIGTERM)
- except psutil.NoSuchProcess:
+ logger.info("Terminating descendant process {} PID: {}"
+ .format(descendant.cmdline(), descendant.pid))
+ if not kill_using_shell(logger, descendant.pid, signal.SIGTERM):
descendant_processes.remove(descendant)
- logger.warning("Waiting up to {}s for processes to exit..."
- .format(TIME_TO_WAIT_AFTER_SIGTERM))
+ logger.info("Waiting up to {}s for processes to exit..."
+ .format(timeout))
try:
- psutil.wait_procs(descendant_processes, TIME_TO_WAIT_AFTER_SIGTERM)
- logger.warning("Done waiting")
+ psutil.wait_procs(descendant_processes, timeout)
+ logger.info("Done waiting")
except psutil.TimeoutExpired:
logger.warning("Ran out of time while waiting for "
"processes to exit")
@@ -242,85 +252,19 @@ def kill_process_tree(logger, pid):
if len(descendant_processes) > 0:
temp_processes = descendant_processes[:]
for descendant in temp_processes:
- logger.warning("Killing descendant process {} PID: {}"
- .format(descendant.cmdline(), descendant.pid))
- try:
- kill_using_shell(descendant.pid, signal.SIGTERM)
- descendant.wait()
- except psutil.NoSuchProcess:
+ logger.info("Killing descendant process {} PID: {}"
+ .format(descendant.cmdline(), descendant.pid))
+ if not kill_using_shell(logger, descendant.pid, signal.SIGKILL):
descendant_processes.remove(descendant)
- logger.warning("Killed all descendant processes of {} PID: {}"
- .format(root_process.cmdline(),
- root_process.pid))
+ else:
+ descendant.wait()
+ logger.info("Killed all descendant processes of {} PID: {}"
+ .format(root_process.cmdline(),
+ root_process.pid))
else:
logger.debug("There are no descendant processes to kill")
-def kill_descendant_processes(logger, pids_to_kill=None):
- """
- Kills all descendant processes of this process.
-
- :param logger: logger
- :type logger: logging.Logger
- :param pids_to_kill: if specified, kill only these PIDs
- :type pids_to_kill: list[int]
- """
- # First try SIGTERM
- this_process = psutil.Process(os.getpid())
-
- # Only check child processes to ensure that we don't have a case
- # where a child process died but the PID got reused.
- descendant_processes = [x for x in this_process.children(recursive=True)
- if x.is_running()]
- if pids_to_kill:
- descendant_processes = [x for x in descendant_processes
- if x.pid in pids_to_kill]
-
- if len(descendant_processes) == 0:
- logger.debug("There are no descendant processes that can be killed")
- return
- logger.warning("Terminating descendant processes of {} PID: {}"
- .format(this_process.cmdline(),
- this_process.pid))
-
- temp_processes = descendant_processes[:]
- for descendant in temp_processes:
- try:
- logger.warning("Terminating descendant process {} PID: {}"
- .format(descendant.cmdline(), descendant.pid))
- descendant.terminate()
- except psutil.NoSuchProcess:
- descendant_processes.remove(descendant)
-
- logger.warning("Waiting up to {}s for processes to exit..."
- .format(TIME_TO_WAIT_AFTER_SIGTERM))
- try:
- psutil.wait_procs(descendant_processes, TIME_TO_WAIT_AFTER_SIGTERM)
- logger.warning("Done waiting")
- except psutil.TimeoutExpired:
- logger.warning("Ran out of time while waiting for "
- "processes to exit")
- # Then SIGKILL
- descendant_processes = [x for x in this_process.children(recursive=True)
- if x.is_running()]
- if pids_to_kill:
- descendant_processes = [x for x in descendant_processes
- if x.pid in pids_to_kill]
-
- if len(descendant_processes) > 0:
- for descendant in descendant_processes:
- logger.warning("Killing descendant process {} PID: {}"
- .format(descendant.cmdline(), descendant.pid))
- try:
- descendant.kill()
- descendant.wait()
- except psutil.NoSuchProcess:
- pass
- logger.warning("Killed all descendant processes of {} PID: {}"
- .format(this_process.cmdline(),
- this_process.pid))
-
-
class AirflowImporter(object):
"""
Importer that dynamically loads a class and module from its parent. This
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9bdfb8c7/tests/utils/helpers.py
----------------------------------------------------------------------
diff --git a/tests/utils/helpers.py b/tests/utils/helpers.py
new file mode 100644
index 0000000..3ef43f8
--- /dev/null
+++ b/tests/utils/helpers.py
@@ -0,0 +1,84 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import logging
+import multiprocessing
+import unittest
+import psutil
+import signal
+import time
+
+from airflow.utils import helpers
+
+
+class TestHelpers(unittest.TestCase):
+ @staticmethod
+ def _ignores_sigterm(child_pid, setup_done):
+ def signal_handler(signum, frame):
+ pass
+ signal.signal(signal.SIGTERM, signal_handler)
+ child_pid.value = os.getpid()
+ setup_done.release()
+ while True:
+ time.sleep(1)
+
+ @staticmethod
+ def _parent_of_ignores_sigterm(child_process_killed, child_pid,
+ process_done, setup_done):
+ child = multiprocessing.Process(target=TestHelpers._ignores_sigterm,
+ args=[child_pid, setup_done])
+ child.start()
+ if setup_done.acquire(timeout=1.0):
+ helpers.kill_process_tree(logging.getLogger(), os.getpid(), timeout=1.0)
+ # Process.is_alive doesnt work with SIGKILL
+ if not psutil.pid_exists(child_pid.value):
+ child_process_killed.value = 1
+ process_done.release()
+
+ def test_kill_process_tree(self):
+ """ Spin up a process that can't be killed by SIGTERM and make sure it gets killed
anyway. """
+ child_process_killed = multiprocessing.Value('i', 0)
+ process_done = multiprocessing.Semaphore(0)
+ child_pid = multiprocessing.Value('i', 0)
+ setup_done = multiprocessing.Semaphore(0)
+ args = [child_process_killed, child_pid, process_done, setup_done]
+ child = multiprocessing.Process(target=TestHelpers._parent_of_ignores_sigterm, args=args)
+ try:
+ child.start()
+ self.assertTrue(process_done.acquire(timeout=5.0))
+ self.assertEqual(1, child_process_killed.value)
+ finally:
+ try:
+ os.kill(child_pid.value, signal.SIGKILL) # terminate doesnt work here
+ except OSError:
+ pass
+ child.terminate()
+
+ def test_kill_using_shell(self):
+ """ Test when no process exists. """
+ child_pid = multiprocessing.Value('i', 0)
+ setup_done = multiprocessing.Semaphore(0)
+ args = [child_pid, setup_done]
+ child = multiprocessing.Process(target=TestHelpers._ignores_sigterm, args=args)
+ child.start()
+
+ self.assertTrue(setup_done.acquire(timeout=1.0))
+ pid_to_kill = child_pid.value
+ self.assertTrue(helpers.kill_using_shell(logging.getLogger(), pid_to_kill,
+ signal=signal.SIGKILL))
+ child.join() # remove orphan process
+ self.assertFalse(helpers.kill_using_shell(logging.getLogger(), pid_to_kill,
+ signal=signal.SIGKILL))
+
|