ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aonis...@apache.org
Subject ambari git commit: Revert "AMBARI-18505. Ambari Status commands should enforce a timeout < heartbeat interval (aonishuk)"
Date Thu, 03 Nov 2016 08:36:23 GMT
Repository: ambari
Updated Branches:
  refs/heads/branch-2.5 8f47dcd88 -> b9e7df0ed


Revert "AMBARI-18505. Ambari Status commands should enforce a timeout < heartbeat interval
(aonishuk)"

This reverts commit 6489987328e3f2f87e31b40dc80e39c12a7ce1bd.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b9e7df0e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b9e7df0e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b9e7df0e

Branch: refs/heads/branch-2.5
Commit: b9e7df0edf9401fe5ecdbf4f254c021ee6862ea4
Parents: 8f47dcd
Author: Andrew Onishuk <aonishuk@hortonworks.com>
Authored: Thu Nov 3 10:34:07 2016 +0200
Committer: Andrew Onishuk <aonishuk@hortonworks.com>
Committed: Thu Nov 3 10:34:07 2016 +0200

----------------------------------------------------------------------
 ambari-agent/conf/unix/ambari-agent.ini         |  1 -
 .../src/main/python/ambari_agent/ActionQueue.py | 34 ++++----
 .../src/main/python/ambari_agent/Controller.py  | 10 ---
 .../ambari_agent/PythonReflectiveExecutor.py    | 20 ++---
 .../ambari_agent/StatusCommandsExecutor.py      | 90 --------------------
 .../src/main/python/ambari_agent/main.py        | 21 +----
 .../test/python/ambari_agent/TestActionQueue.py | 76 ++++++++++++-----
 .../test/python/ambari_agent/TestController.py  |  1 -
 .../src/test/python/ambari_agent/TestMain.py    |  3 +-
 9 files changed, 84 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index 9a0b537..914e09a 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -32,7 +32,6 @@ tolerate_download_failures=true
 run_as_user=root
 parallel_execution=0
 alert_grace_period=5
-status_command_timeout=5
 alert_kinit_timeout=14400000
 system_resource_overrides=/etc/resource_overrides
 ; memory_threshold_soft_mb=400

http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 1131d21..f104939 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -18,7 +18,6 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 import Queue
-import multiprocessing
 
 import logging
 import traceback
@@ -75,8 +74,7 @@ class ActionQueue(threading.Thread):
   def __init__(self, config, controller):
     super(ActionQueue, self).__init__()
     self.commandQueue = Queue.Queue()
-    self.statusCommandQueue = multiprocessing.Queue()
-    self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor.
+    self.statusCommandQueue = Queue.Queue()
     self.backgroundCommandQueue = Queue.Queue()
     self.commandStatuses = CommandStatusDict(callback_action =
       self.status_update_callback)
@@ -97,9 +95,8 @@ class ActionQueue(threading.Thread):
     return self._stop.isSet()
 
   def put_status(self, commands):
-    #Clear all status commands. Was supposed that we got all set of statuses, we don't need
to keep old ones
-    while not self.statusCommandQueue.empty():
-      self.statusCommandQueue.get()
+    #Was supposed that we got all set of statuses, we don't need to keep old ones
+    self.statusCommandQueue.queue.clear()
 
     for command in commands:
       logger.info("Adding " + command['commandType'] + " for component " + \
@@ -155,7 +152,7 @@ class ActionQueue(threading.Thread):
     try:
       while not self.stopped():
         self.processBackgroundQueueSafeEmpty();
-        self.processStatusCommandResultQueueSafeEmpty();
+        self.processStatusCommandQueueSafeEmpty();
         try:
           if self.parallel_execution == 0:
             command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
@@ -199,17 +196,14 @@ class ActionQueue(threading.Thread):
       except Queue.Empty:
         pass
 
-  def processStatusCommandResultQueueSafeEmpty(self):
-    while not self.statusCommandResultQueue.empty():
+  def processStatusCommandQueueSafeEmpty(self):
+    while not self.statusCommandQueue.empty():
       try:
-        result = self.statusCommandResultQueue.get(False)
-        self.process_status_command_result(result)
+        command = self.statusCommandQueue.get(False)
+        self.process_command(command)
       except Queue.Empty:
         pass
-      except IOError:
-        # on race condition in multiprocessing.Queue if get/put and thread kill are executed
at the same time.
-        # During queue.close IOError will be thrown (this prevents from permanently dead-locked
get).
-        pass
+
 
   def createCommandHandle(self, command):
     if command.has_key('__handle'):
@@ -230,6 +224,8 @@ class ActionQueue(threading.Thread):
         finally:
           if self.controller.recovery_manager.enabled():
             self.controller.recovery_manager.stop_execution_command()
+      elif commandType == self.STATUS_COMMAND:
+        self.execute_status_command(command)
       else:
         logger.error("Unrecognized command " + pprint.pformat(command))
     except Exception:
@@ -491,12 +487,11 @@ class ActionQueue(threading.Thread):
 
     self.commandStatuses.put_command_status(handle.command, roleResult)
 
-  def process_status_command_result(self, result):
+  def execute_status_command(self, command):
     '''
     Executes commands of type STATUS_COMMAND
     '''
     try:
-      command, component_status_result, component_security_status_result = result
       cluster = command['clusterName']
       service = command['serviceName']
       component = command['componentName']
@@ -511,6 +506,11 @@ class ActionQueue(threading.Thread):
 
       component_extra = None
 
+      # For custom services, responsibility to determine service status is
+      # delegated to python scripts
+      component_status_result = self.customServiceOrchestrator.requestComponentStatus(command)
+      component_security_status_result = self.customServiceOrchestrator.requestComponentSecurityState(command)
+
       if component_status_result['exitcode'] == 0:
         component_status = LiveStatus.LIVE_STATUS
         if self.controller.recovery_manager.enabled() \

http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index cece30c..2a4d384 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -39,7 +39,6 @@ import AmbariConfig
 from ambari_agent.Heartbeat import Heartbeat
 from ambari_agent.Register import Register
 from ambari_agent.ActionQueue import ActionQueue
-from ambari_agent.StatusCommandsExecutor import StatusCommandsExecutor
 from ambari_agent.FileCache import FileCache
 from ambari_agent.NetUtil import NetUtil
 from ambari_agent.LiveStatus import LiveStatus
@@ -84,7 +83,6 @@ class Controller(threading.Thread):
     self.cachedconnect = None
     self.range = range
     self.hasMappedComponents = True
-    self.statusCommandsExecutor = None
     # Event is used for synchronizing heartbeat iterations (to make possible
     # manual wait() interruption between heartbeats )
     self.heartbeat_stop_callback = heartbeat_stop_callback
@@ -443,18 +441,10 @@ class Controller(threading.Thread):
         logger.info("Stop event received")
         self.DEBUG_STOP_HEARTBEATING=True
 
-  def spawnStatusCommandsExecutorProcess(self):
-    self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue)
-    self.statusCommandsExecutor.start()
-
-  def getStatusCommandsExecutor(self):
-    return self.statusCommandsExecutor
-
   def run(self):
     try:
       self.actionQueue = ActionQueue(self.config, controller=self)
       self.actionQueue.start()
-      self.spawnStatusCommandsExecutorProcess()
       self.register = Register(self.config)
       self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())
   

http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
index b27d7d1..655b2fc 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
@@ -53,9 +53,7 @@ class PythonReflectiveExecutor(PythonExecutor):
     returncode = 1
 
     try:
-      current_context = PythonContext(script_dir, pythonCommand)
-      PythonReflectiveExecutor.last_context = current_context
-      with current_context:
+      with PythonContext(script_dir, pythonCommand):
         imp.load_source('__main__', script)
     except SystemExit as e:
       returncode = e.code
@@ -78,8 +76,6 @@ class PythonContext:
   def __init__(self, script_dir, pythonCommand):
     self.script_dir = script_dir
     self.pythonCommand = pythonCommand
-    self.is_reverted = False
-    self.is_forced_revert = False
     
   def __enter__(self):
     self.old_sys_path = copy.copy(sys.path)
@@ -92,18 +88,12 @@ class PythonContext:
     sys.argv = self.pythonCommand[1:]
 
   def __exit__(self, exc_type, exc_val, exc_tb):
-    self.revert(is_forced_revert=False)
+    sys.path = self.old_sys_path
+    sys.argv = self.old_agv
+    logging.disable(self.old_logging_disable)
+    self.revert_sys_modules(self.old_sys_modules)
     return False
   
-  def revert(self, is_forced_revert=True):
-    if not self.is_reverted:
-      self.is_forced_revert = is_forced_revert
-      self.is_reverted = True
-      sys.path = self.old_sys_path
-      sys.argv = self.old_agv
-      logging.disable(self.old_logging_disable)
-      self.revert_sys_modules(self.old_sys_modules)
-
   def revert_sys_modules(self, value):
     sys.modules.update(value)
     

http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
deleted file mode 100644
index 12f58e5..0000000
--- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
+++ /dev/null
@@ -1,90 +0,0 @@
-#!/usr/bin/env python
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-
-import os
-import signal
-import threading
-import logging
-import multiprocessing
-from ambari_agent.PythonReflectiveExecutor import PythonReflectiveExecutor
-from ambari_agent.RemoteDebugUtils import bind_debug_signal_handlers
-
-logger = logging.getLogger(__name__)
-
-class StatusCommandsExecutor(multiprocessing.Process):
-  """
-  A process which executes status/security status commands.
-
-  It dies and respawns itself on timeout of the command. Which is the most graceful way to
end the currently running status command.
-  """
-  def __init__(self, config, actionQueue):
-    multiprocessing.Process.__init__(self)
-
-    self.config = config
-    self.actionQueue = actionQueue
-
-    self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout',
5)) # in seconds
-    self.hasTimeoutedEvent = multiprocessing.Event()
-
-  def run(self):
-    try:
-      bind_debug_signal_handlers()
-      while True:
-        command = self.actionQueue.statusCommandQueue.get(True) # blocks until status status
command appears
-        logger.debug("Running status command for {0}".format(command['componentName']))
-        
-        timeout_timer = threading.Timer( self.status_command_timeout, self.respawn, [command])
-        timeout_timer.start()
-
-        self.process_status_command(command)
-
-        timeout_timer.cancel()
-        logger.debug("Completed status command for {0}".format(command['componentName']))
-    except:
-      logger.exception("StatusCommandsExecutor process failed with exception:")
-      raise
-
-    logger.warn("StatusCommandsExecutor process has finished")
-
-  def process_status_command(self, command):
-    component_status_result = self.actionQueue.customServiceOrchestrator.requestComponentStatus(command)
-    component_security_status_result = self.actionQueue.customServiceOrchestrator.requestComponentSecurityState(command)
-    result = (command, component_status_result, component_security_status_result)
-
-    self.actionQueue.statusCommandResultQueue.put(result)
-
-  def respawn(self, command):
-    try:
-      if hasattr(PythonReflectiveExecutor, "last_context"):
-        # Force context to reset to normal. By context we mean sys.path, imports, etc. They
are set by specific status command, and are not relevant to ambari-agent.
-        PythonReflectiveExecutor.last_context.revert()
-
-      logger.warn("Command {0} for {1} is running for more than {2} seconds. Terminating
it due to timeout.".format(command['commandType'], command['componentName'], self.status_command_timeout))
-
-      self.hasTimeoutedEvent.set()
-    except:
-      logger.exception("StatusCommandsExecutor.finish thread failed with exception:")
-      raise
-
-  def kill(self):
-    os.kill(self.pid, signal.SIGKILL)
-
-    # prevent queue from ending up with non-freed semaphores, locks during put. Which would
result in dead-lock in process executing get.
-    self.actionQueue.statusCommandResultQueue.close()
-    self.actionQueue.statusCommandResultQueue.join_thread()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index b1bcae1..cfcd129 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -276,22 +276,6 @@ def reset_agent(options):
 
 MAX_RETRIES = 10
 
-def run_threads(server_hostname, heartbeat_stop_callback):
-  # Launch Controller communication
-  controller = Controller(config, server_hostname, heartbeat_stop_callback)
-  controller.start()
-  while controller.is_alive():
-    time.sleep(0.1)
-
-    if controller.getStatusCommandsExecutor() is not None and (not controller.getStatusCommandsExecutor().is_alive()
or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()):
-      if controller.getStatusCommandsExecutor().is_alive():
-        logger.info("Terminating statusCommandsExecutor")
-        controller.getStatusCommandsExecutor().kill()
-      logger.info("Respawning statusCommandsExecutor")
-      controller.spawnStatusCommandsExecutorProcess()
-
-  controller.getStatusCommandsExecutor().kill()
-
 # event - event, that will be passed to Controller and NetUtil to make able to interrupt
loops form outside process
 # we need this for windows os, where no sigterm available
 def main(heartbeat_stop_callback=None):
@@ -396,7 +380,10 @@ def main(heartbeat_stop_callback=None):
         # Set the active server
         active_server = server_hostname
         # Launch Controller communication
-        run_threads(server_hostname, heartbeat_stop_callback)
+        controller = Controller(config, server_hostname, heartbeat_stop_callback)
+        controller.start()
+        while controller.is_alive():
+          time.sleep(0.1)
 
       #
       # If Ambari Agent connected to the server or

http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 4a63f7c..7d04d42 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -310,7 +310,9 @@ class TestActionQueue(TestCase):
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch("logging.RootLogger.exception")
   @patch.object(ActionQueue, "execute_command")
-  def test_process_command(self, execute_command_mock, log_exc_mock):
+  @patch.object(ActionQueue, "execute_status_command")
+  def test_process_command(self, execute_status_command_mock,
+                           execute_command_mock, log_exc_mock):
     dummy_controller = MagicMock()
     config = AmbariConfig()
     config.set('agent', 'tolerate_download_failures', "true")
@@ -327,19 +329,29 @@ class TestActionQueue(TestCase):
     # Try wrong command
     actionQueue.process_command(wrong_command)
     self.assertFalse(execute_command_mock.called)
+    self.assertFalse(execute_status_command_mock.called)
     self.assertFalse(log_exc_mock.called)
 
     execute_command_mock.reset_mock()
+    execute_status_command_mock.reset_mock()
     log_exc_mock.reset_mock()
     # Try normal execution
     actionQueue.process_command(execution_command)
     self.assertTrue(execute_command_mock.called)
+    self.assertFalse(execute_status_command_mock.called)
     self.assertFalse(log_exc_mock.called)
 
     execute_command_mock.reset_mock()
+    execute_status_command_mock.reset_mock()
     log_exc_mock.reset_mock()
 
+    actionQueue.process_command(status_command)
+    self.assertFalse(execute_command_mock.called)
+    self.assertTrue(execute_status_command_mock.called)
+    self.assertFalse(log_exc_mock.called)
+
     execute_command_mock.reset_mock()
+    execute_status_command_mock.reset_mock()
     log_exc_mock.reset_mock()
 
     # Try exception to check proper logging
@@ -351,6 +363,7 @@ class TestActionQueue(TestCase):
 
     log_exc_mock.reset_mock()
 
+    execute_status_command_mock.side_effect = side_effect
     actionQueue.process_command(execution_command)
     self.assertTrue(log_exc_mock.called)
 
@@ -822,11 +835,14 @@ class TestActionQueue(TestCase):
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(ActionQueue, "status_update_callback")
+  @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
+  @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
   @patch.object(ActionQueue, "execute_command")
   @patch.object(LiveStatus, "build")
   @patch.object(CustomServiceOrchestrator, "__init__")
   def test_execute_status_command(self, CustomServiceOrchestrator_mock,
-                                  build_mock, execute_command_mock,
+                                  build_mock, execute_command_mock, requestComponentSecurityState_mock,
+                                  requestComponentStatus_mock,
                                   status_update_callback):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
@@ -836,25 +852,33 @@ class TestActionQueue(TestCase):
 
     dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
 
-    result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN')
+    requestComponentStatus_mock.reset_mock()
+    requestComponentStatus_mock.return_value = {'exitcode': 0 }
 
-    actionQueue.process_status_command_result(result)
+    requestComponentSecurityState_mock.reset_mock()
+    requestComponentSecurityState_mock.return_value = 'UNKNOWN'
+
+    actionQueue.execute_status_command(self.status_command)
     report = actionQueue.result()
     expected = {'dummy report': '',
                 'securityState' : 'UNKNOWN'}
 
     self.assertEqual(len(report['componentStatus']), 1)
     self.assertEqual(report['componentStatus'][0], expected)
+    self.assertTrue(requestComponentStatus_mock.called)
 
   @patch.object(RecoveryManager, "command_exists")
   @patch.object(RecoveryManager, "requires_recovery")
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(ActionQueue, "status_update_callback")
+  @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
+  @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
   @patch.object(ActionQueue, "execute_command")
   @patch.object(LiveStatus, "build")
   @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_process_status_command_result_recovery(self, CustomServiceOrchestrator_mock,
-                                  build_mock, execute_command_mock,
+  def test_execute_status_command_recovery(self, CustomServiceOrchestrator_mock,
+                                  build_mock, execute_command_mock, requestComponentSecurityState_mock,
+                                  requestComponentStatus_mock,
                                   status_update_callback, requires_recovery_mock,
                                   command_exists_mock):
     CustomServiceOrchestrator_mock.return_value = None
@@ -867,9 +891,13 @@ class TestActionQueue(TestCase):
 
     dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp(), True, False)
 
-    result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN')
+    requestComponentStatus_mock.reset_mock()
+    requestComponentStatus_mock.return_value = {'exitcode': 0 }
+
+    requestComponentSecurityState_mock.reset_mock()
+    requestComponentSecurityState_mock.return_value = 'UNKNOWN'
 
-    actionQueue.process_status_command_result(result)
+    actionQueue.execute_status_command(self.status_command)
     report = actionQueue.result()
     expected = {'dummy report': '',
                 'securityState' : 'UNKNOWN',
@@ -877,13 +905,17 @@ class TestActionQueue(TestCase):
 
     self.assertEqual(len(report['componentStatus']), 1)
     self.assertEqual(report['componentStatus'][0], expected)
+    self.assertTrue(requestComponentStatus_mock.called)
 
     requires_recovery_mock.return_value = True
     command_exists_mock.return_value = True
-    
-    result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN')
+    requestComponentStatus_mock.reset_mock()
+    requestComponentStatus_mock.return_value = {'exitcode': 0 }
+
+    requestComponentSecurityState_mock.reset_mock()
+    requestComponentSecurityState_mock.return_value = 'UNKNOWN'
 
-    actionQueue.process_status_command_result(result)
+    actionQueue.execute_status_command(self.status_command)
     report = actionQueue.result()
     expected = {'dummy report': '',
                 'securityState' : 'UNKNOWN',
@@ -891,33 +923,39 @@ class TestActionQueue(TestCase):
 
     self.assertEqual(len(report['componentStatus']), 1)
     self.assertEqual(report['componentStatus'][0], expected)
+    self.assertTrue(requestComponentStatus_mock.called)
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(ActionQueue, "status_update_callback")
+  @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
+  @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
   @patch.object(ActionQueue, "execute_command")
   @patch.object(LiveStatus, "build")
   @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_process_status_command_result_with_alerts(self, CustomServiceOrchestrator_mock,
+  def test_execute_status_command_with_alerts(self, CustomServiceOrchestrator_mock,
+                                              requestComponentSecurityState_mock,
                                   build_mock, execute_command_mock,
+                                  requestComponentStatus_mock,
                                   status_update_callback):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
     actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
-    command_return_value = {
+
+
+    requestComponentStatus_mock.reset_mock()
+    requestComponentStatus_mock.return_value = {
       'exitcode': 0,
       'stdout': 'out',
       'stderr': 'err',
       'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] }
     }
-    
-    result = (self.status_command_for_alerts, command_return_value, command_return_value)
-    
     build_mock.return_value = {'somestatusresult': 'aresult'}
 
-    actionQueue.process_status_command_result(result)
+    actionQueue.execute_status_command(self.status_command_for_alerts)
 
     report = actionQueue.result()
 
+    self.assertTrue(requestComponentStatus_mock.called)
     self.assertEqual(len(report['componentStatus']), 1)
     self.assertTrue(report['componentStatus'][0].has_key('alerts'))
 
@@ -1177,7 +1215,7 @@ class TestActionQueue(TestCase):
     execute_command = copy.deepcopy(self.background_command)
     actionQueue.put([execute_command])
     actionQueue.processBackgroundQueueSafeEmpty();
-    actionQueue.processStatusCommandResultQueueSafeEmpty();
+    actionQueue.processStatusCommandQueueSafeEmpty();
     
     #assert that python execturor start
     self.assertTrue(runCommand_mock.called)
@@ -1221,7 +1259,7 @@ class TestActionQueue(TestCase):
                                                                  None, command_complete_w)
     actionQueue.put([self.background_command])
     actionQueue.processBackgroundQueueSafeEmpty();
-    actionQueue.processStatusCommandResultQueueSafeEmpty();
+    actionQueue.processStatusCommandQueueSafeEmpty();
     
     with lock:
       complete_done.wait(0.1)

http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py
index b47af03..59b41cd 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -44,7 +44,6 @@ import ambari_commons
 
 @not_for_platform(PLATFORM_WINDOWS)
 @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-@patch.object(Controller.Controller, "spawnStatusCommandsExecutorProcess", new = MagicMock())
 class TestController(unittest.TestCase):
 
   logger = logging.getLogger()

http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/test/python/ambari_agent/TestMain.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestMain.py b/ambari-agent/src/test/python/ambari_agent/TestMain.py
index 998b778..400241f 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestMain.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestMain.py
@@ -324,7 +324,6 @@ class TestMain(unittest.TestCase):
   @patch.object(Controller, "__init__")
   @patch.object(Controller, "is_alive")
   @patch.object(Controller, "start")
-  @patch.object(Controller, "getStatusCommandsExecutor")
   @patch("optparse.OptionParser.parse_args")
   @patch.object(DataCleaner,"start")
   @patch.object(DataCleaner,"__init__")
@@ -333,7 +332,7 @@ class TestMain(unittest.TestCase):
   @patch.object(ExitHelper,"execute_cleanup")
   @patch.object(ExitHelper, "exit")
   def test_main(self, exithelper_exit_mock, cleanup_mock, ping_port_init_mock, ping_port_start_mock,
data_clean_init_mock,data_clean_start_mock,
-                parse_args_mock, start_mock, Controller_getStatusCommandsExecutor, Controller_is_alive_mock,
Controller_init_mock, try_to_connect_mock,
+                parse_args_mock, start_mock, Controller_is_alive_mock, Controller_init_mock,
try_to_connect_mock,
                 update_log_level_mock, daemonize_mock, perform_prestart_checks_mock,
                 ambari_config_mock,
                 stop_mock, bind_signal_handlers_mock,


Mime
View raw message