ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hapyles...@apache.org
Subject [ambari] 02/02: [AMBARI-22888] Cancel operation during package deployment causing repository manager to be broken (dgrinenko)
Date Thu, 01 Feb 2018 18:13:29 GMT
This is an automated email from the ASF dual-hosted git repository.

hapylestat pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit 9c346beeb5a8187d7c5ace1d99094eb3d84c2c61
Author: Reishin <hapy.lestat@gmail.com>
AuthorDate: Thu Feb 1 16:43:47 2018 +0200

    [AMBARI-22888] Cancel operation during package deployment causing repository manager to
be broken (dgrinenko)
---
 .../src/test/python/ambari_agent/TestShell.py      |  50 +++----
 .../src/main/python/ambari_commons/shell.py        | 144 +++++++++++++++------
 2 files changed, 132 insertions(+), 62 deletions(-)

diff --git a/ambari-agent/src/test/python/ambari_agent/TestShell.py b/ambari-agent/src/test/python/ambari_agent/TestShell.py
index 6a0538a..0f72020 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestShell.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestShell.py
@@ -9,9 +9,7 @@ regarding copyright ownership.  The ASF licenses this file
 to you under the Apache License, Version 2.0 (the
 "License"); you may not use this file except in compliance
 with the License.  You may obtain a copy of the License at
-
     http://www.apache.org/licenses/LICENSE-2.0
-
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -28,7 +26,7 @@ from ambari_commons import OSCheck
 from StringIO import StringIO
 
 ROOT_PID = 10
-ROOT_PID_CHILDRENS = [10, 11, 12, 13]
+ROOT_PID_CHILDREN = [10, 11, 12, 13]
 shell.logger = MagicMock()  # suppress any log output
 
 __proc_fs = {
@@ -59,6 +57,16 @@ __proc_fs_yum = {
   "/proc/11/cmdline": "yum install something"
 }
 
+# Remove any wait delay, no need for tests
+__old_waiter = shell.wait_for_process_list_kill
+
+
+def __wait_for_process_list_kill(pids, timeout=5, check_step_time=0.1):
+  return __old_waiter(pids, 0, check_step_time)
+
+
+shell.wait_for_process_list_kill = __wait_for_process_list_kill
+
 
 class FakeSignals(object):
   SIGTERM = signal.SIG_IGN
@@ -86,41 +94,39 @@ class TestShell(unittest.TestCase):
   @patch("__builtin__.open", new=MagicMock(side_effect=_open_mock))
   def test_get_all_children(self):
 
-    pid_list = [item[0] for item in shell.get_all_childrens(ROOT_PID)]
+    pid_list = [item[0] for item in shell.get_all_children(ROOT_PID)]
 
-    self.assertEquals(len(ROOT_PID_CHILDRENS), len(pid_list))
+    self.assertEquals(len(ROOT_PID_CHILDREN), len(pid_list))
     self.assertEquals(ROOT_PID, pid_list[0])
 
-    for i in ROOT_PID_CHILDRENS:
+    for i in ROOT_PID_CHILDREN:
       self.assertEquals(True, i in pid_list)
 
   @patch("__builtin__.open", new=MagicMock(side_effect=_open_mock))
   @patch.object(OSCheck, "get_os_family", new=MagicMock(return_value="redhat"))
   @patch.object(shell, "signal", new_callable=FakeSignals)
-  @patch.object(shell, "is_pid_life")
+  @patch("os.listdir")
   @patch("os.kill")
-  def test_kill_process_with_children(self, os_kill_mock, is_pid_life_mock, fake_signals):
-    pid_list = [item[0] for item in shell.get_all_childrens(ROOT_PID)]
+  def test_kill_process_with_children(self, os_kill_mock, os_list_dir_mock, fake_signals):
+    pid_list = [item[0] for item in shell.get_all_children(ROOT_PID)]
+    pid_list_str = [str(i) for i in ROOT_PID_CHILDREN]
     reverse_pid_list = sorted(pid_list, reverse=True)
-    shell.gracefull_kill_delay = 0.1
-    is_pid_life_clean_kill = [True] * len(pid_list) + [False] * len(pid_list)
-    is_pid_life_not_clean_kill = [True] * (len(pid_list) * 2)
+    os_list_dir_mock.side_effect = [pid_list_str, [], [], []]
 
-    is_pid_life_mock.side_effect = is_pid_life_clean_kill
     shell.kill_process_with_children(ROOT_PID)
 
-    # test clean pid by SIGTERM
+    # test pid kill by SIGTERM
     os_kill_pids = [item[0][0] for item in os_kill_mock.call_args_list]
     self.assertEquals(len(os_kill_pids), len(pid_list))
     self.assertEquals(reverse_pid_list, os_kill_pids)
 
     os_kill_mock.reset_mock()
-    is_pid_life_mock.reset_mock()
+    os_list_dir_mock.reset_mock()
 
-    is_pid_life_mock.side_effect = is_pid_life_not_clean_kill
+    os_list_dir_mock.side_effect = [pid_list_str, pid_list_str, pid_list_str, pid_list_str,
[], []]
     shell.kill_process_with_children(ROOT_PID)
 
-    # test clean pid by SIGKILL
+    # test pid kill by SIGKILL
     os_kill_pids = [item[0][0] for item in os_kill_mock.call_args_list]
     self.assertEquals(len(os_kill_pids), len(pid_list)*2)
     self.assertEquals(reverse_pid_list + reverse_pid_list, os_kill_pids)
@@ -128,17 +134,13 @@ class TestShell(unittest.TestCase):
   @patch("__builtin__.open", new=MagicMock(side_effect=_open_mock_yum))
   @patch.object(OSCheck, "get_os_family", new=MagicMock(return_value="redhat"))
   @patch.object(shell, "signal", new_callable=FakeSignals)
-  @patch.object(shell, "is_pid_life")
+  @patch("os.listdir")
   @patch("os.kill")
-  def test_kill_process_with_children_except_yum(self, os_kill_mock, is_pid_life_mock, fake_signals):
-    shell.gracefull_kill_delay = 0.1
-    is_pid_life_clean_kill = [True, False, True, False]  # used here only first pair
-
-    is_pid_life_mock.side_effect = is_pid_life_clean_kill
+  def test_kill_process_with_children_except_yum(self, os_kill_mock, os_list_dir_mock, fake_signals):
+    os_list_dir_mock.side_effect = [["10", "12", "20"], [], [], []]
     shell.kill_process_with_children(ROOT_PID)
 
     # test clean pid by SIGTERM
     os_kill_pids = [item[0][0] for item in os_kill_mock.call_args_list]
     self.assertEquals(len(os_kill_pids), 1)
     self.assertEquals([10], os_kill_pids)
-
diff --git a/ambari-common/src/main/python/ambari_commons/shell.py b/ambari-common/src/main/python/ambari_commons/shell.py
index a4b3263..d84fe99 100644
--- a/ambari-common/src/main/python/ambari_commons/shell.py
+++ b/ambari-common/src/main/python/ambari_commons/shell.py
@@ -23,16 +23,15 @@ import subprocess
 import os
 import signal
 import threading
-import time
 from contextlib import contextmanager
 
 from ambari_commons import OSConst
 from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl
+from resource_management.core import sudo
 
 logger = logging.getLogger()
 
 threadLocal = threading.local()
-gracefull_kill_delay = 5  # seconds between SIGTERM and SIGKILL
 
 # default timeout for async invoked processes
 TIMEOUT_SECONDS = 300
@@ -231,10 +230,11 @@ class shellRunnerWindows(shellRunner):
     return _dict_to_object({'exitCode': code, 'output': out, 'error': err})
 
 
-def get_all_childrens(base_pid):
+def get_all_children(base_pid):
   """
-  Return all child's pids of base_pid process
-
+  Return all child PIDs of base_pid process
+  :param base_pid starting PID to scan for children
+  :return tuple of the following: pid, binary name, command line incl. binary
   :type base_pid int
   :rtype list[(int, str, str)]
   """
@@ -242,7 +242,7 @@ def get_all_childrens(base_pid):
   comm_path_pattern = "/proc/{0}/comm"
   cmdline_path_pattern = "/proc/{0}/cmdline"
 
-  def read_childrens(pid):
+  def read_children(pid):
     try:
       with open(parent_pid_path_pattern.format(pid), "r") as f:
         return [int(item) for item in f.readline().strip().split(" ")]
@@ -263,60 +263,128 @@ def get_all_childrens(base_pid):
     except IOError:
       return ""
 
-  done = []
-  pending = [int(base_pid)]
+  pids = []
+  scan_pending = [int(base_pid)]
 
-  while pending:
-    mypid = pending.pop(0)
-    children = read_childrens(mypid)
+  while scan_pending:
+    curr_pid = scan_pending.pop(0)
+    children = read_children(curr_pid)
 
-    done.append((mypid, read_command(mypid), read_cmdline(mypid)))
-    pending.extend(children)
+    pids.append((curr_pid, read_command(curr_pid), read_cmdline(curr_pid)))
+    scan_pending.extend(children)
 
-  return done
+  return pids
 
 
-def is_pid_life(pid):
+def is_pid_exists(pid):
   """
-  check if process with pid still exists (not counting it real state)
-
+  Check if process with PID still exist (not counting it real state)
   :type pid int
+  :rtype bool
   """
   pid_path = "/proc/{0}"
   try:
     return os.path.exists(pid_path.format(pid))
-  except Exception:
-    logger.debug("Failed to check pid state")
+  except (OSError, IOError):
+    logger.debug("Failed to check PID existence")
     return False
 
 
-# linux specific code
+def get_existing_pids(pids):
+  """
+  Check if process with pid still exists (not counting it real state).
+  Optimized to check PID list at once.
+  :param pids list of PIDs to filter
+  :return list of still existing PID
+  :type pids list[int]
+  :rtype list[int]
+  """
+
+  existing_pid_list = []
+
+  try:
+    all_existing_pid_list = [int(item) for item in os.listdir("/proc") if item.isdigit()]
+  except (OSError, IOError):
+    logger.debug("Failed to check PIDs existence")
+    return existing_pid_list
+
+  for pid_item in pids:
+    if pid_item in all_existing_pid_list:
+      existing_pid_list.append(pid_item)
+
+  return existing_pid_list
+
+
+def wait_for_process_list_kill(pids, timeout=5, check_step_time=0.1):
+  """
+  Process tree waiter
+  :type pids list[int]
+  :type timeout int|float
+  :type check_step_time int|float
+  :param pids list of PIDs to watch
+  :param timeout how long wait till giving up, seconds. Set 0 for nowait or None for infinite
time
+  :param check_step_time how often scan for existing PIDs, seconds
+  """
+  from threading import Thread, Event
+  import time
+
+  stop_waiting = Event()
+
+  def _wait_loop():
+    while not stop_waiting.is_set() and get_existing_pids(pids):
+      time.sleep(check_step_time)
+
+  if timeout == 0:  # no need for loop if no timeout is set
+    return
+
+  th = Thread(target=_wait_loop)
+  stop_waiting.clear()
+
+  th.start()
+  th.join(timeout=timeout)
+  stop_waiting.set()
+
+  th.join()
+
+
 @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
-def kill_process_with_children(parent_pid):
+def kill_process_with_children(base_pid):
+  """
+  Process tree killer
+  :type base_pid int
+  """
   exception_list = ["apt-get", "apt", "yum", "zypper", "zypp"]
-  signals_to_post = [signal.SIGTERM, signal.SIGKILL]
-  all_chield_pids = [item[0] for item in get_all_childrens(parent_pid) if item[1].lower()
not in exception_list and item[0] != os.getpid()]
-  clean_kill = True
-  last_error = ""
+  signals_to_post = {
+    "SIGTERM": signal.SIGTERM,
+    "SIGKILL": signal.SIGKILL
+  }
+  full_child_pids = get_all_children(base_pid)
+  all_child_pids = [item[0] for item in full_child_pids if item[1].lower() not in exception_list
and item[0] != os.getpid()]
+  error_log = []
 
-  for sig in signals_to_post:
+  for sig_name, sig in signals_to_post.items():
     # we need to kill processes from the bottom of the tree
-    pids_to_kill = sorted(all_chield_pids, reverse=True)
+    pids_to_kill = sorted(get_existing_pids(all_child_pids), reverse=True)
     for pid in pids_to_kill:
       try:
-        if is_pid_life(pid):
-          os.kill(pid, sig)
-      except Exception as e:
-        clean_kill = False
-        last_error = repr(e)
+        sudo.kill(pid, sig)
+      except OSError as e:
+        error_log.append((sig_name, pid, repr(e)))
 
     if pids_to_kill:
-      time.sleep(gracefull_kill_delay)
-
-  logger.info("Killed process tree starting from main pid {0}: {1}".format(parent_pid, ",
".join([str(i) for i in all_chield_pids])))
-  if not clean_kill:
-    logger.warn("Failed to kill some child of PID {0} tree".format(parent_pid))
-    logger.warn("Reported error: " + last_error)
+      wait_for_process_list_kill(pids_to_kill)
+      still_existing_pids = get_existing_pids(pids_to_kill)
+      if still_existing_pids:
+        logger.warn("These PIDs {0} did not respond to {1} signal. Detailed commands list:\n
{2}".format(
+          ", ".join([str(i) for i in still_existing_pids]),
+          sig_name,
+          "\n".join([i[2] for i in full_child_pids if i[0] in still_existing_pids])
+        ))
+
+  if get_existing_pids(all_child_pids) and error_log:  # we're unable to kill all requested
PIDs
+    logger.warn("Process termination error log:\n")
+    for error_item in error_log:
+      logger.warn("PID: {0}, Process: {1}, Exception message: {2}".format(*error_item))
 
 
 def _changeUid():

-- 
To stop receiving notification emails like this one, please contact
hapylestat@apache.org.

Mime
View raw message