ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echekans...@apache.org
Subject ambari git commit: AMBARI-20323. Commands timed-out on ambari host without any error logs (echekanskiy)
Date Thu, 09 Mar 2017 16:33:06 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-2.5 c6a9a3ca4 -> 17ef55594


AMBARI-20323. Commands timed-out on ambari host without any error logs (echekanskiy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/17ef5559
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/17ef5559
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/17ef5559

Branch: refs/heads/branch-2.5
Commit: 17ef555940758b73cd09ddcc9fc8a3461604c085
Parents: c6a9a3c
Author: Eugene Chekanskiy <echekanskiy@hortonworks.com>
Authored: Thu Mar 9 18:30:20 2017 +0200
Committer: Eugene Chekanskiy <echekanskiy@hortonworks.com>
Committed: Thu Mar 9 18:30:20 2017 +0200

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py |  52 +---
 .../src/main/python/ambari_agent/Controller.py  |  54 +---
 .../ambari_agent/StatusCommandsExecutor.py      | 307 +++++++++++++++----
 .../src/main/python/ambari_agent/main.py        |  12 +-
 .../test/python/ambari_agent/TestActionQueue.py |   4 +-
 .../test/python/ambari_agent/TestController.py  |   3 -
 .../src/test/python/ambari_agent/TestMain.py    |   9 +-
 7 files changed, 280 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 5300b52..15ae03d 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -76,10 +76,6 @@ class ActionQueue(threading.Thread):
   def __init__(self, config, controller):
     super(ActionQueue, self).__init__()
     self.commandQueue = Queue.Queue()
-    self.statusCommandQueue = None # the queue this field points to is re-created whenever
-                                   # a new StatusCommandExecutor child process is spawned
-                                   # by Controller
-    # multiprocessing.Queue()
     self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor.
     self.backgroundCommandQueue = Queue.Queue()
     self.commandStatuses = CommandStatusDict(callback_action =
@@ -102,25 +98,7 @@ class ActionQueue(threading.Thread):
     return self._stop.isSet()
 
   def put_status(self, commands):
-    if not self.statusCommandQueue.empty():
-      #Clear all status commands. Was supposed that we got all set of statuses, we don't
need to keep old ones
-      statusCommandQueueSize = 0
-      try:
-        while not self.statusCommandQueue.empty():
-          self.statusCommandQueue.get(False)
-          statusCommandQueueSize = statusCommandQueueSize + 1
-      except Queue.Empty:
-        pass
-
-      logger.info("Number of status commands removed from queue : " + str(statusCommandQueueSize))
-
-    for command in commands:
-      logger.info("Adding " + command['commandType'] + " for component " + \
-                  command['componentName'] + " of service " + \
-                  command['serviceName'] + " of cluster " + \
-                  command['clusterName'] + " to the queue.")
-      self.statusCommandQueue.put(command)
-      logger.debug(pprint.pformat(command))
+    self.controller.statusCommandsExecutor.put_commands(commands)
 
   def put(self, commands):
     for command in commands:
@@ -167,8 +145,8 @@ class ActionQueue(threading.Thread):
   def run(self):
     try:
       while not self.stopped():
-        self.processBackgroundQueueSafeEmpty();
-        self.processStatusCommandResultQueueSafeEmpty();
+        self.processBackgroundQueueSafeEmpty()
+        self.process_status_command_results()
         try:
           if self.parallel_execution == 0:
             command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
@@ -212,23 +190,13 @@ class ActionQueue(threading.Thread):
       except Queue.Empty:
         pass
 
-  def processStatusCommandResultQueueSafeEmpty(self):
-    try:
-      while not self.statusCommandResultQueue.empty():
-        try:
-          result = self.statusCommandResultQueue.get(False)
-          self.process_status_command_result(result)
-        except Queue.Empty:
-          pass
-        except IOError:
-          # on race condition in multiprocessing.Queue if get/put and thread kill are executed
at the same time.
-          # During queue.close IOError will be thrown (this prevents from permanently dead-locked
get).
-          pass
-        except UnicodeDecodeError:
-          pass
-    except IOError:
-      # queue.empty() may also throw IOError
-      pass
+  def process_status_command_results(self):
+    self.controller.statusCommandsExecutor.process_logs()
+    for result in self.controller.statusCommandsExecutor.get_results():
+      try:
+        self.process_status_command_result(result)
+      except UnicodeDecodeError:
+        pass
 
   def createCommandHandle(self, command):
     if command.has_key('__handle'):

http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 61a74e6..301ad43 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -40,7 +40,6 @@ import AmbariConfig
 from ambari_agent.Heartbeat import Heartbeat
 from ambari_agent.Register import Register
 from ambari_agent.ActionQueue import ActionQueue
-from ambari_agent.StatusCommandsExecutor import StatusCommandsExecutor
 from ambari_agent.FileCache import FileCache
 from ambari_agent.NetUtil import NetUtil
 from ambari_agent.LiveStatus import LiveStatus
@@ -49,6 +48,7 @@ from ambari_agent.ClusterConfiguration import  ClusterConfiguration
 from ambari_agent.RecoveryManager import  RecoveryManager
 from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers
 from ambari_agent.ExitHelper import ExitHelper
+from ambari_agent.StatusCommandsExecutor import StatusCommandsExecutor
 from resource_management.libraries.functions.version import compare_versions
 from ambari_commons.os_utils import get_used_ram
 
@@ -87,9 +87,6 @@ class Controller(threading.Thread):
     self.hasMappedComponents = True
     self.statusCommandsExecutor = None
 
-    # this lock is used control which thread spawns/kills the StatusCommandExecutor child
process
-    self.spawnKillStatusCommandExecutorLock = threading.RLock()
-
     # Event is used for synchronizing heartbeat iterations (to make possible
     # manual wait() interruption between heartbeats )
     self.heartbeat_stop_callback = heartbeat_stop_callback
@@ -205,7 +202,7 @@ class Controller(threading.Thread):
 
         # Start StatusCommandExecutor child process or restart it if already running
         # in order to receive up to date agent config.
-        self.spawnStatusCommandsExecutorProcess()
+        self.statusCommandsExecutor.relaunch("REGISTER_WITH_SERVER")
 
         if 'statusCommands' in ret.keys():
           logger.debug("Got status commands on registration.")
@@ -476,51 +473,11 @@ class Controller(threading.Thread):
 
       logger.log(logging_level, "Wait for next heartbeat over")
 
-  def spawnStatusCommandsExecutorProcess(self):
-    '''
-    Starts a new StatusCommandExecutor child process. In case there is a running instance
-     already restarts it by simply killing it and starting new one.
-     This function is thread-safe.
-    '''
-    with self.getSpawnKillStatusCommandExecutorLock():
-      # if there is already an instance of StatusCommandExecutor kill it first
-      self.killStatusCommandsExecutorProcess()
-
-      # Re-create the status command queue as in case the consumer
-      # process is killed the queue may deadlock (see http://bugs.python.org/issue20527).
-      # The queue must be re-created by the producer process.
-      statusCommandQueue = self.actionQueue.statusCommandQueue
-      self.actionQueue.statusCommandQueue = multiprocessing.Queue()
-
-      if statusCommandQueue is not None:
-        statusCommandQueue.close()
-
-      logger.info("Spawning statusCommandsExecutor")
-      self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue)
-      self.statusCommandsExecutor.start()
-
-  def killStatusCommandsExecutorProcess(self):
-    '''
-    Kills the StatusExecutorChild process if exists. This function is thread-safe.
-    '''
-    with self.getSpawnKillStatusCommandExecutorLock():
-      if self.statusCommandsExecutor is not None and self.statusCommandsExecutor.is_alive():
-        logger.info("Terminating statusCommandsExecutor.")
-        self.statusCommandsExecutor.kill()
-
-  def getSpawnKillStatusCommandExecutorLock(self):
-    '''
-    Re-entrant lock to be used to synchronize the spawning or killing of
-    StatusCommandExecutor child process in multi-thread environment.
-    '''
-    return self.spawnKillStatusCommandExecutorLock;
-
-  def getStatusCommandsExecutor(self):
-    return self.statusCommandsExecutor
-
   def run(self):
     try:
       self.actionQueue = ActionQueue(self.config, controller=self)
+      self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue)
+      ExitHelper().register(self.statusCommandsExecutor.kill, "CLEANUP_KILLING")
       self.actionQueue.start()
       self.register = Register(self.config)
       self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())
@@ -613,6 +570,9 @@ class Controller(threading.Thread):
     logger.debug("LiveStatus.CLIENT_COMPONENTS" + str(LiveStatus.CLIENT_COMPONENTS))
     logger.debug("LiveStatus.COMPONENTS" + str(LiveStatus.COMPONENTS))
 
+  def get_status_commands_executor(self):
+    return self.statusCommandsExecutor
+
   def move_data_dir_mount_file(self):
     """
     In Ambari 2.1.2, we moved the dfs_data_dir_mount.hist to a static location

http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
index 2f15770..3f7ef4c 100644
--- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
@@ -1,5 +1,5 @@
 #!/usr/bin/env python
-'''
+"""
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
@@ -15,80 +15,279 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-'''
+"""
 
-import os
-import signal
-import threading
+import Queue
 import logging
 import multiprocessing
-from ambari_agent.PythonReflectiveExecutor import PythonReflectiveExecutor
+import os
+import pprint
+import threading
+
+import time
+
+import signal
 from ambari_agent.RemoteDebugUtils import bind_debug_signal_handlers
-from ambari_agent.ExitHelper import ExitHelper
 
 logger = logging.getLogger(__name__)
 
-class StatusCommandsExecutor(multiprocessing.Process):
-  """
-  A process which executes status/security status commands.
 
-  It dies and respawns itself on timeout of the command. Which is the most graceful way to
end the currently running status command.
-  """
+class StatusCommandsExecutor(object):
   def __init__(self, config, actionQueue):
-    multiprocessing.Process.__init__(self)
-
     self.config = config
     self.actionQueue = actionQueue
 
-    self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout',
5)) # in seconds
-    self.hasTimeoutedEvent = multiprocessing.Event()
-    ExitHelper().register(self.kill)
+    # used to prevent queues from been used during creation of new one to prevent threads
messing up with combination of
+    # old and new queues
+    self.usage_lock = threading.RLock()
 
-  def run(self):
-    try:
-      bind_debug_signal_handlers()
-      logger.info("StatusCommandsExecutor starting")
+    self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout',
5))
+    self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
+
+    self.worker_process = None
+    self.mustDieEvent = multiprocessing.Event()
+    self.timedOutEvent = multiprocessing.Event()
+
+    # multiprocessing stuff that need to be cleaned every time
+    self.mp_result_queue = multiprocessing.Queue()
+    self.mp_result_logs = multiprocessing.Queue()
+    self.mp_task_queue = multiprocessing.Queue()
+
+  def _log_message(self, level, message, exception=None):
+    """
+    Put log message to logging queue. Must be used only for logging from child process(in
_worker_process_target).
+
+    :param level:
+    :param message:
+    :param exception:
+    :return:
+    """
+    result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message
+    self.mp_result_logs.put((level, result_message, exception))
+
+  def get_log_messages(self):
+    """
+    Returns list of (level, message, exception) log messages.
+
+    :return: list of (level, message, exception)
+    """
+    results = []
+    with self.usage_lock:
+      try:
+        while not self.mp_result_logs.empty():
+          try:
+            results.append(self.mp_result_logs.get(False))
+          except Queue.Empty:
+            pass
+          except IOError:
+            pass
+          except UnicodeDecodeError:
+            pass
+      except IOError:
+        pass
+    return results
+
+  def process_logs(self):
+    """
+    Get all available at this moment logs and prints them to logger.
+    """
+    for level, message, exception in self.get_log_messages():
+      if level == logging.ERROR:
+        logger.debug(message, exc_info=exception)
+      if level == logging.WARN:
+        logger.warn(message)
+      if level == logging.INFO:
+        logger.info(message)
+
+  def _worker_process_target(self):
+    """
+    Internal method that running in separate process.
+    """
+    bind_debug_signal_handlers()
+    self._log_message(logging.INFO, "StatusCommandsExecutor process started")
+
+    # region StatusCommandsExecutor process internals
+    internal_in_queue = Queue.Queue()
+    internal_out_queue = Queue.Queue()
+
+    def _internal_worker():
+      """
+      thread that actually executes status commands
+      """
       while True:
-        command = self.actionQueue.statusCommandQueue.get(True) # blocks until status status
command appears
-        logger.debug("Running status command for {0}".format(command['componentName']))
-        
-        timeout_timer = threading.Timer( self.status_command_timeout, self.respawn, [command])
-        timeout_timer.start()
-
-        self.process_status_command(command)
-
-        timeout_timer.cancel()
-        logger.debug("Completed status command for {0}".format(command['componentName']))
-    except:
-      logger.exception("StatusCommandsExecutor process failed with exception:")
-      raise
+        _cmd = internal_in_queue.get()
+        component_status_result = self.customServiceOrchestrator.requestComponentStatus(_cmd)
+        component_security_status_result = self.customServiceOrchestrator.requestComponentSecurityState(_cmd)
+        internal_out_queue.put((_cmd, component_status_result, component_security_status_result))
 
-    logger.warn("StatusCommandsExecutor process has finished")
+    worker = threading.Thread(target=_internal_worker)
+    worker.daemon = True
+    worker.start()
 
-  def process_status_command(self, command):
-    component_status_result = self.actionQueue.customServiceOrchestrator.requestComponentStatus(command)
-    component_security_status_result = self.actionQueue.customServiceOrchestrator.requestComponentSecurityState(command)
-    result = (command, component_status_result, component_security_status_result)
+    def _internal_process_command(_command):
+      internal_in_queue.put(_command)
+      start_time = time.time()
+      result = None
+      while not self.mustDieEvent.is_set() and not result and time.time() - start_time <
self.status_command_timeout:
+        try:
+          result = internal_out_queue.get(timeout=1)
+        except Queue.Empty:
+          pass
 
-    self.actionQueue.statusCommandResultQueue.put(result)
+      if result:
+        self.mp_result_queue.put(result)
+        return True
+      else:
+        # do not set timed out event twice
+        if not self.timedOutEvent.is_set():
+          self._set_timed_out(_command)
+        return False
+
+    # endregion
 
-  def respawn(self, command):
     try:
-      if hasattr(PythonReflectiveExecutor, "last_context"):
-        # Force context to reset to normal. By context we mean sys.path, imports, etc. They
are set by specific status command, and are not relevant to ambari-agent.
-        PythonReflectiveExecutor.last_context.revert()
+      while not self.mustDieEvent.is_set():
+        try:
+          command = self.mp_task_queue.get(False)
+        except Queue.Empty:
+          # no command, lets try in other loop iteration
+          time.sleep(.1)
+          continue
+
+        self._log_message(logging.DEBUG, "Running status command for {0}".format(command['componentName']))
 
-      logger.warn("Command {0} for {1} is running for more than {2} seconds. Terminating
it due to timeout.".format(command['commandType'], command['componentName'], self.status_command_timeout))
+        if _internal_process_command(command):
+          self._log_message(logging.DEBUG, "Completed status command for {0}".format(command['componentName']))
 
-      self.hasTimeoutedEvent.set()
-    except:
-      logger.exception("StatusCommandsExecutor.finish thread failed with exception:")
+    except Exception as e:
+      self._log_message(logging.ERROR, "StatusCommandsExecutor process failed with exception:",
e)
       raise
 
-  def kill(self):
-    os.kill(self.pid, signal.SIGKILL)
+    self._log_message(logging.WARN, "StatusCommandsExecutor subprocess finished")
+
+  def _set_timed_out(self, command):
+    """
+    Set timeout event and adding log entry for given command.
+
+    :param command:
+    :return:
+    """
+    msg = "Command {0} for {1} is running for more than {2} seconds. Terminating it due to
timeout.".format(
+        command['commandType'],
+        command['componentName'],
+        self.status_command_timeout
+    )
+    self._log_message(logging.WARN, msg)
+    self.timedOutEvent.set()
+
+  def put_commands(self, commands):
+    """
+    Put given commands to command executor.
+
+    :param commands: status commands to execute
+    :return:
+    """
+    with self.usage_lock:
+      if not self.mp_task_queue.empty():
+        status_command_queue_size = 0
+        try:
+          while not self.mp_task_queue.empty():
+            self.mp_task_queue.get(False)
+            status_command_queue_size += 1
+        except Queue.Empty:
+          pass
+
+        logger.info("Number of status commands removed from queue : " + str(status_command_queue_size))
+      for command in commands:
+        logger.info("Adding " + command['commandType'] + " for component " + \
+                    command['componentName'] + " of service " + \
+                    command['serviceName'] + " of cluster " + \
+                    command['clusterName'] + " to the queue.")
+        self.mp_task_queue.put(command)
+        logger.debug(pprint.pformat(command))
+
+  def get_results(self):
+    """
+    Get all available results for status commands.
+
+    :return: list of results
+    """
+    results = []
+    with self.usage_lock:
+      try:
+        while not self.mp_result_queue.empty():
+          try:
+            results.append(self.mp_result_queue.get(False))
+          except Queue.Empty:
+            pass
+          except IOError:
+            pass
+          except UnicodeDecodeError:
+            pass
+      except IOError:
+        pass
+    return results
+
+  @property
+  def need_relaunch(self):
+    """
+    Indicates if process need to be relaunched due to timeout or it is dead or even was not
created.
+    """
+    return self.timedOutEvent.is_set() or not self.worker_process or not self.worker_process.is_alive()
+
+  def relaunch(self, reason=None):
+    """
+    Restart status command executor internal process.
+
+    :param reason: reason of restart
+    :return:
+    """
+    self.kill(reason)
+    self.worker_process = multiprocessing.Process(target=self._worker_process_target)
+    self.worker_process.start()
+    logger.info("Started process with pid {0}".format(self.worker_process.pid))
+
+  def kill(self, reason=None):
+    """
+    Tries to stop command executor internal process for sort time, otherwise killing it.
Closing all possible queues to
+    unblock threads that probably blocked on read or write operations to queues. Must be
called from threads different
+    from threads that calling read or write methods(get_log_messages, get_results, put_commands).
+
+    :param reason: reason of killing
+    :return:
+    """
+    # try graceful stop, otherwise hard-kill
+    if self.worker_process and self.worker_process.is_alive():
+      logger.info("Killing child process reason:" + str(reason))
+      self.mustDieEvent.set()
+      self.worker_process.join(timeout=3)
+      if self.worker_process.is_alive():
+        os.kill(self.worker_process.pid, signal.SIGKILL)
+        logger.info("Child process killed by -9")
+      else:
+        # get log messages only if we died gracefully, otherwise we will have chance to block
here forever, in most cases
+        # this call will do nothing, as all logs will be processed in ActionQueue loop
+        self.process_logs()
+        logger.info("Child process died gracefully")
+    else:
+      logger.info("Child process already dead")
+
+    # close queues and acquire usage lock
+    # closing both sides of pipes here, we need this hack in case of blocking on recv() call
+    self.mp_result_queue.close()
+    self.mp_result_queue._writer.close()
+    self.mp_result_logs.close()
+    self.mp_result_logs._writer.close()
+    self.mp_task_queue.close()
+    self.mp_task_queue._writer.close()
 
-    # prevent queue from ending up with non-freed semaphores, locks during put. Which would
result in dead-lock in process executing get.
-    self.actionQueue.statusCommandResultQueue.close()
-    self.actionQueue.statusCommandResultQueue.join_thread()
-    self.actionQueue.statusCommandResultQueue = multiprocessing.Queue()
+    with self.usage_lock:
+      self.mp_result_queue.join_thread()
+      self.mp_result_queue = multiprocessing.Queue()
+      self.mp_task_queue.join_thread()
+      self.mp_task_queue = multiprocessing.Queue()
+      self.mp_result_logs.join_thread()
+      self.mp_result_logs = multiprocessing.Queue()
+      self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
+      self.mustDieEvent.clear()
+      self.timedOutEvent.clear()

http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 3f333c4..3c5d8f1 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -311,18 +311,14 @@ def run_threads(server_hostname, heartbeat_stop_callback):
   # Launch Controller communication
   controller = Controller(config, server_hostname, heartbeat_stop_callback)
   controller.start()
+  time.sleep(2) # in order to get controller.statusCommandsExecutor initialized
   while controller.is_alive():
     time.sleep(0.1)
 
-    with controller.getSpawnKillStatusCommandExecutorLock():
-      # We need to lock as Controller.py may try to spawn StatusCommandExecutor child in
parallel as well
-      if controller.getStatusCommandsExecutor() is not None \
-          and (not controller.getStatusCommandsExecutor().is_alive()
-              or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()):
-        controller.spawnStatusCommandsExecutorProcess()
+    if controller.get_status_commands_executor().need_relaunch:
+      controller.get_status_commands_executor().relaunch("COMMAND_TIMEOUT_OR_KILLED")
 
-
-  controller.killStatusCommandsExecutorProcess()
+  controller.get_status_commands_executor().kill()
 
 # event - event, that will be passed to Controller and NetUtil to make able to interrupt
loops form outside process
 # we need this for windows os, where no sigterm available

http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 9fefefb..8701a24 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -1329,7 +1329,7 @@ class TestActionQueue(TestCase):
     execute_command = copy.deepcopy(self.background_command)
     actionQueue.put([execute_command])
     actionQueue.processBackgroundQueueSafeEmpty();
-    actionQueue.processStatusCommandResultQueueSafeEmpty();
+    actionQueue.process_status_command_results();
     
     #assert that python execturor start
     self.assertTrue(runCommand_mock.called)
@@ -1373,7 +1373,7 @@ class TestActionQueue(TestCase):
                                                                  None, command_complete_w)
     actionQueue.put([self.background_command])
     actionQueue.processBackgroundQueueSafeEmpty();
-    actionQueue.processStatusCommandResultQueueSafeEmpty();
+    actionQueue.process_status_command_results();
     
     with lock:
       complete_done.wait(0.1)

http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py
index 663e215..7f5d451 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -44,7 +44,6 @@ import ambari_commons
 
 @not_for_platform(PLATFORM_WINDOWS)
 @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-@patch.object(Controller.Controller, "spawnStatusCommandsExecutorProcess", new = MagicMock())
 class TestController(unittest.TestCase):
 
   logger = logging.getLogger()
@@ -119,10 +118,8 @@ class TestController(unittest.TestCase):
     self.assertEqual({"responseId":1}, self.controller.registerWithServer())
 
     self.controller.sendRequest.return_value = {"responseId":1, "statusCommands": "commands",
"log":"", "exitstatus":"0"}
-    self.controller.addToStatusQueue = MagicMock(name="addToStatusQueue")
     self.controller.isRegistered = False
     self.assertEqual({'exitstatus': '0', 'responseId': 1, 'log': '', 'statusCommands': 'commands'},
self.controller.registerWithServer())
-    self.controller.addToStatusQueue.assert_called_with("commands")
 
     calls = []
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/test/python/ambari_agent/TestMain.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestMain.py b/ambari-agent/src/test/python/ambari_agent/TestMain.py
index 97c448b..af2f68b 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestMain.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestMain.py
@@ -324,8 +324,6 @@ class TestMain(unittest.TestCase):
   @patch.object(Controller, "__init__")
   @patch.object(Controller, "is_alive")
   @patch.object(Controller, "start")
-  @patch.object(Controller, "getStatusCommandsExecutor")
-  @patch.object(Controller, "killStatusCommandsExecutorProcess")
   @patch("optparse.OptionParser.parse_args")
   @patch.object(DataCleaner,"start")
   @patch.object(DataCleaner,"__init__")
@@ -333,9 +331,10 @@ class TestMain(unittest.TestCase):
   @patch.object(PingPortListener,"__init__")
   @patch.object(ExitHelper,"execute_cleanup")
   @patch.object(ExitHelper, "exit")
-  def test_main(self, exithelper_exit_mock, cleanup_mock, ping_port_init_mock, ping_port_start_mock,
data_clean_init_mock,data_clean_start_mock,
-                parse_args_mock, start_mock, Controller_killStatusCommandsExecutorProcess,
-                Controller_getStatusCommandsExecutor, Controller_is_alive_mock, Controller_init_mock,
try_to_connect_mock,
+  @patch.object(Controller, "get_status_commands_executor")
+  def test_main(self, get_status_commands_executor_mock, exithelper_exit_mock, cleanup_mock,
ping_port_init_mock,
+                ping_port_start_mock, data_clean_init_mock,data_clean_start_mock,
+                parse_args_mock, start_mock, Controller_is_alive_mock, Controller_init_mock,
try_to_connect_mock,
                 update_log_level_mock, daemonize_mock, perform_prestart_checks_mock,
                 ambari_config_mock,
                 stop_mock, bind_signal_handlers_mock,


Mime
View raw message