From commits-return-51351-archive-asf-public=cust-asf.ponee.io@ambari.apache.org Thu Feb 1 19:13:31 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 96D7A18066D for ; Thu, 1 Feb 2018 19:13:31 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 86B9A160C44; Thu, 1 Feb 2018 18:13:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 81DA9160C35 for ; Thu, 1 Feb 2018 19:13:30 +0100 (CET) Received: (qmail 39330 invoked by uid 500); 1 Feb 2018 18:13:29 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 39321 invoked by uid 99); 1 Feb 2018 18:13:29 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Feb 2018 18:13:29 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id DD642858A8; Thu, 1 Feb 2018 18:13:27 +0000 (UTC) Date: Thu, 01 Feb 2018 18:13:29 +0000 To: "commits@ambari.apache.org" Subject: [ambari] 02/02: [AMBARI-22888] Cancel operation during package deployment causing repository manager to be broken (dgrinenko) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: hapylestat@apache.org In-Reply-To: <151750880773.31426.118608827182709313@gitbox.apache.org> References: <151750880773.31426.118608827182709313@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: ambari X-Git-Refname: refs/heads/branch-2.6 X-Git-Reftype: branch X-Git-Rev: 9c346beeb5a8187d7c5ace1d99094eb3d84c2c61 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20180201181327.DD642858A8@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. hapylestat pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/ambari.git commit 9c346beeb5a8187d7c5ace1d99094eb3d84c2c61 Author: Reishin AuthorDate: Thu Feb 1 16:43:47 2018 +0200 [AMBARI-22888] Cancel operation during package deployment causing repository manager to be broken (dgrinenko) --- .../src/test/python/ambari_agent/TestShell.py | 50 +++---- .../src/main/python/ambari_commons/shell.py | 144 +++++++++++++++------ 2 files changed, 132 insertions(+), 62 deletions(-) diff --git a/ambari-agent/src/test/python/ambari_agent/TestShell.py b/ambari-agent/src/test/python/ambari_agent/TestShell.py index 6a0538a..0f72020 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestShell.py +++ b/ambari-agent/src/test/python/ambari_agent/TestShell.py @@ -9,9 +9,7 @@ regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -28,7 +26,7 @@ from ambari_commons import OSCheck from StringIO import StringIO ROOT_PID = 10 -ROOT_PID_CHILDRENS = [10, 11, 12, 13] +ROOT_PID_CHILDREN = [10, 11, 12, 13] shell.logger = MagicMock() # suppress any log output __proc_fs = { @@ -59,6 +57,16 @@ __proc_fs_yum = { "/proc/11/cmdline": "yum install something" } +# Remove any wait delay, no need for tests +__old_waiter = shell.wait_for_process_list_kill + + +def __wait_for_process_list_kill(pids, timeout=5, check_step_time=0.1): + return __old_waiter(pids, 0, check_step_time) + + +shell.wait_for_process_list_kill = __wait_for_process_list_kill + class FakeSignals(object): SIGTERM = signal.SIG_IGN @@ -86,41 +94,39 @@ class TestShell(unittest.TestCase): @patch("__builtin__.open", new=MagicMock(side_effect=_open_mock)) def test_get_all_children(self): - pid_list = [item[0] for item in shell.get_all_childrens(ROOT_PID)] + pid_list = [item[0] for item in shell.get_all_children(ROOT_PID)] - self.assertEquals(len(ROOT_PID_CHILDRENS), len(pid_list)) + self.assertEquals(len(ROOT_PID_CHILDREN), len(pid_list)) self.assertEquals(ROOT_PID, pid_list[0]) - for i in ROOT_PID_CHILDRENS: + for i in ROOT_PID_CHILDREN: self.assertEquals(True, i in pid_list) @patch("__builtin__.open", new=MagicMock(side_effect=_open_mock)) @patch.object(OSCheck, "get_os_family", new=MagicMock(return_value="redhat")) @patch.object(shell, "signal", new_callable=FakeSignals) - @patch.object(shell, "is_pid_life") + @patch("os.listdir") @patch("os.kill") - def test_kill_process_with_children(self, os_kill_mock, is_pid_life_mock, fake_signals): - pid_list = [item[0] for item in shell.get_all_childrens(ROOT_PID)] + def test_kill_process_with_children(self, os_kill_mock, os_list_dir_mock, fake_signals): + pid_list = [item[0] for item in shell.get_all_children(ROOT_PID)] + pid_list_str = [str(i) for i in ROOT_PID_CHILDREN] reverse_pid_list = sorted(pid_list, reverse=True) - shell.gracefull_kill_delay = 0.1 - is_pid_life_clean_kill = [True] * len(pid_list) + [False] * len(pid_list) - is_pid_life_not_clean_kill = [True] * (len(pid_list) * 2) + os_list_dir_mock.side_effect = [pid_list_str, [], [], []] - is_pid_life_mock.side_effect = is_pid_life_clean_kill shell.kill_process_with_children(ROOT_PID) - # test clean pid by SIGTERM + # test pid kill by SIGTERM os_kill_pids = [item[0][0] for item in os_kill_mock.call_args_list] self.assertEquals(len(os_kill_pids), len(pid_list)) self.assertEquals(reverse_pid_list, os_kill_pids) os_kill_mock.reset_mock() - is_pid_life_mock.reset_mock() + os_list_dir_mock.reset_mock() - is_pid_life_mock.side_effect = is_pid_life_not_clean_kill + os_list_dir_mock.side_effect = [pid_list_str, pid_list_str, pid_list_str, pid_list_str, [], []] shell.kill_process_with_children(ROOT_PID) - # test clean pid by SIGKILL + # test pid kill by SIGKILL os_kill_pids = [item[0][0] for item in os_kill_mock.call_args_list] self.assertEquals(len(os_kill_pids), len(pid_list)*2) self.assertEquals(reverse_pid_list + reverse_pid_list, os_kill_pids) @@ -128,17 +134,13 @@ class TestShell(unittest.TestCase): @patch("__builtin__.open", new=MagicMock(side_effect=_open_mock_yum)) @patch.object(OSCheck, "get_os_family", new=MagicMock(return_value="redhat")) @patch.object(shell, "signal", new_callable=FakeSignals) - @patch.object(shell, "is_pid_life") + @patch("os.listdir") @patch("os.kill") - def test_kill_process_with_children_except_yum(self, os_kill_mock, is_pid_life_mock, fake_signals): - shell.gracefull_kill_delay = 0.1 - is_pid_life_clean_kill = [True, False, True, False] # used here only first pair - - is_pid_life_mock.side_effect = is_pid_life_clean_kill + def test_kill_process_with_children_except_yum(self, os_kill_mock, os_list_dir_mock, fake_signals): + os_list_dir_mock.side_effect = [["10", "12", "20"], [], [], []] shell.kill_process_with_children(ROOT_PID) # test clean pid by SIGTERM os_kill_pids = [item[0][0] for item in os_kill_mock.call_args_list] self.assertEquals(len(os_kill_pids), 1) self.assertEquals([10], os_kill_pids) - diff --git a/ambari-common/src/main/python/ambari_commons/shell.py b/ambari-common/src/main/python/ambari_commons/shell.py index a4b3263..d84fe99 100644 --- a/ambari-common/src/main/python/ambari_commons/shell.py +++ b/ambari-common/src/main/python/ambari_commons/shell.py @@ -23,16 +23,15 @@ import subprocess import os import signal import threading -import time from contextlib import contextmanager from ambari_commons import OSConst from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl +from resource_management.core import sudo logger = logging.getLogger() threadLocal = threading.local() -gracefull_kill_delay = 5 # seconds between SIGTERM and SIGKILL # default timeout for async invoked processes TIMEOUT_SECONDS = 300 @@ -231,10 +230,11 @@ class shellRunnerWindows(shellRunner): return _dict_to_object({'exitCode': code, 'output': out, 'error': err}) -def get_all_childrens(base_pid): +def get_all_children(base_pid): """ - Return all child's pids of base_pid process - + Return all child PIDs of base_pid process + :param base_pid starting PID to scan for children + :return tuple of the following: pid, binary name, command line incl. binary :type base_pid int :rtype list[(int, str, str)] """ @@ -242,7 +242,7 @@ def get_all_childrens(base_pid): comm_path_pattern = "/proc/{0}/comm" cmdline_path_pattern = "/proc/{0}/cmdline" - def read_childrens(pid): + def read_children(pid): try: with open(parent_pid_path_pattern.format(pid), "r") as f: return [int(item) for item in f.readline().strip().split(" ")] @@ -263,60 +263,128 @@ def get_all_childrens(base_pid): except IOError: return "" - done = [] - pending = [int(base_pid)] + pids = [] + scan_pending = [int(base_pid)] - while pending: - mypid = pending.pop(0) - children = read_childrens(mypid) + while scan_pending: + curr_pid = scan_pending.pop(0) + children = read_children(curr_pid) - done.append((mypid, read_command(mypid), read_cmdline(mypid))) - pending.extend(children) + pids.append((curr_pid, read_command(curr_pid), read_cmdline(curr_pid))) + scan_pending.extend(children) - return done + return pids -def is_pid_life(pid): +def is_pid_exists(pid): """ - check if process with pid still exists (not counting it real state) - + Check if process with PID still exist (not counting it real state) :type pid int + :rtype bool """ pid_path = "/proc/{0}" try: return os.path.exists(pid_path.format(pid)) - except Exception: - logger.debug("Failed to check pid state") + except (OSError, IOError): + logger.debug("Failed to check PID existence") return False -# linux specific code +def get_existing_pids(pids): + """ + Check if process with pid still exists (not counting it real state). + Optimized to check PID list at once. + :param pids list of PIDs to filter + :return list of still existing PID + :type pids list[int] + :rtype list[int] + """ + + existing_pid_list = [] + + try: + all_existing_pid_list = [int(item) for item in os.listdir("/proc") if item.isdigit()] + except (OSError, IOError): + logger.debug("Failed to check PIDs existence") + return existing_pid_list + + for pid_item in pids: + if pid_item in all_existing_pid_list: + existing_pid_list.append(pid_item) + + return existing_pid_list + + +def wait_for_process_list_kill(pids, timeout=5, check_step_time=0.1): + """ + Process tree waiter + :type pids list[int] + :type timeout int|float + :type check_step_time int|float + :param pids list of PIDs to watch + :param timeout how long wait till giving up, seconds. Set 0 for nowait or None for infinite time + :param check_step_time how often scan for existing PIDs, seconds + """ + from threading import Thread, Event + import time + + stop_waiting = Event() + + def _wait_loop(): + while not stop_waiting.is_set() and get_existing_pids(pids): + time.sleep(check_step_time) + + if timeout == 0: # no need for loop if no timeout is set + return + + th = Thread(target=_wait_loop) + stop_waiting.clear() + + th.start() + th.join(timeout=timeout) + stop_waiting.set() + + th.join() + + @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) -def kill_process_with_children(parent_pid): +def kill_process_with_children(base_pid): + """ + Process tree killer + :type base_pid int + """ exception_list = ["apt-get", "apt", "yum", "zypper", "zypp"] - signals_to_post = [signal.SIGTERM, signal.SIGKILL] - all_chield_pids = [item[0] for item in get_all_childrens(parent_pid) if item[1].lower() not in exception_list and item[0] != os.getpid()] - clean_kill = True - last_error = "" + signals_to_post = { + "SIGTERM": signal.SIGTERM, + "SIGKILL": signal.SIGKILL + } + full_child_pids = get_all_children(base_pid) + all_child_pids = [item[0] for item in full_child_pids if item[1].lower() not in exception_list and item[0] != os.getpid()] + error_log = [] - for sig in signals_to_post: + for sig_name, sig in signals_to_post.items(): # we need to kill processes from the bottom of the tree - pids_to_kill = sorted(all_chield_pids, reverse=True) + pids_to_kill = sorted(get_existing_pids(all_child_pids), reverse=True) for pid in pids_to_kill: try: - if is_pid_life(pid): - os.kill(pid, sig) - except Exception as e: - clean_kill = False - last_error = repr(e) + sudo.kill(pid, sig) + except OSError as e: + error_log.append((sig_name, pid, repr(e))) if pids_to_kill: - time.sleep(gracefull_kill_delay) - - logger.info("Killed process tree starting from main pid {0}: {1}".format(parent_pid, ", ".join([str(i) for i in all_chield_pids]))) - if not clean_kill: - logger.warn("Failed to kill some child of PID {0} tree".format(parent_pid)) - logger.warn("Reported error: " + last_error) + wait_for_process_list_kill(pids_to_kill) + still_existing_pids = get_existing_pids(pids_to_kill) + if still_existing_pids: + logger.warn("These PIDs {0} did not respond to {1} signal. Detailed commands list:\n {2}".format( + ", ".join([str(i) for i in still_existing_pids]), + sig_name, + "\n".join([i[2] for i in full_child_pids if i[0] in still_existing_pids]) + )) + + if get_existing_pids(all_child_pids) and error_log: # we're unable to kill all requested PIDs + logger.warn("Process termination error log:\n") + for error_item in error_log: + logger.warn("PID: {0}, Process: {1}, Exception message: {2}".format(*error_item)) def _changeUid(): -- To stop receiving notification emails like this one, please contact hapylestat@apache.org.