Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 362F6200BB1 for ; Thu, 3 Nov 2016 09:36:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 34A9F160AFF; Thu, 3 Nov 2016 08:36:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D07D0160AFE for ; Thu, 3 Nov 2016 09:36:24 +0100 (CET) Received: (qmail 55111 invoked by uid 500); 3 Nov 2016 08:36:24 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 55102 invoked by uid 99); 3 Nov 2016 08:36:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Nov 2016 08:36:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CE307DFE93; Thu, 3 Nov 2016 08:36:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aonishuk@apache.org To: commits@ambari.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: Revert "AMBARI-18505. Ambari Status commands should enforce a timeout < heartbeat interval (aonishuk)" Date: Thu, 3 Nov 2016 08:36:23 +0000 (UTC) archived-at: Thu, 03 Nov 2016 08:36:26 -0000 Repository: ambari Updated Branches: refs/heads/branch-2.5 8f47dcd88 -> b9e7df0ed Revert "AMBARI-18505. Ambari Status commands should enforce a timeout < heartbeat interval (aonishuk)" This reverts commit 6489987328e3f2f87e31b40dc80e39c12a7ce1bd. Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b9e7df0e Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b9e7df0e Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b9e7df0e Branch: refs/heads/branch-2.5 Commit: b9e7df0edf9401fe5ecdbf4f254c021ee6862ea4 Parents: 8f47dcd Author: Andrew Onishuk Authored: Thu Nov 3 10:34:07 2016 +0200 Committer: Andrew Onishuk Committed: Thu Nov 3 10:34:07 2016 +0200 ---------------------------------------------------------------------- ambari-agent/conf/unix/ambari-agent.ini | 1 - .../src/main/python/ambari_agent/ActionQueue.py | 34 ++++---- .../src/main/python/ambari_agent/Controller.py | 10 --- .../ambari_agent/PythonReflectiveExecutor.py | 20 ++--- .../ambari_agent/StatusCommandsExecutor.py | 90 -------------------- .../src/main/python/ambari_agent/main.py | 21 +---- .../test/python/ambari_agent/TestActionQueue.py | 76 ++++++++++++----- .../test/python/ambari_agent/TestController.py | 1 - .../src/test/python/ambari_agent/TestMain.py | 3 +- 9 files changed, 84 insertions(+), 172 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/conf/unix/ambari-agent.ini ---------------------------------------------------------------------- diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini index 9a0b537..914e09a 100644 --- a/ambari-agent/conf/unix/ambari-agent.ini +++ b/ambari-agent/conf/unix/ambari-agent.ini @@ -32,7 +32,6 @@ tolerate_download_failures=true run_as_user=root parallel_execution=0 alert_grace_period=5 -status_command_timeout=5 alert_kinit_timeout=14400000 system_resource_overrides=/etc/resource_overrides ; memory_threshold_soft_mb=400 http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 1131d21..f104939 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -18,7 +18,6 @@ See the License for the specific language governing permissions and limitations under the License. ''' import Queue -import multiprocessing import logging import traceback @@ -75,8 +74,7 @@ class ActionQueue(threading.Thread): def __init__(self, config, controller): super(ActionQueue, self).__init__() self.commandQueue = Queue.Queue() - self.statusCommandQueue = multiprocessing.Queue() - self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor. + self.statusCommandQueue = Queue.Queue() self.backgroundCommandQueue = Queue.Queue() self.commandStatuses = CommandStatusDict(callback_action = self.status_update_callback) @@ -97,9 +95,8 @@ class ActionQueue(threading.Thread): return self._stop.isSet() def put_status(self, commands): - #Clear all status commands. Was supposed that we got all set of statuses, we don't need to keep old ones - while not self.statusCommandQueue.empty(): - self.statusCommandQueue.get() + #Was supposed that we got all set of statuses, we don't need to keep old ones + self.statusCommandQueue.queue.clear() for command in commands: logger.info("Adding " + command['commandType'] + " for component " + \ @@ -155,7 +152,7 @@ class ActionQueue(threading.Thread): try: while not self.stopped(): self.processBackgroundQueueSafeEmpty(); - self.processStatusCommandResultQueueSafeEmpty(); + self.processStatusCommandQueueSafeEmpty(); try: if self.parallel_execution == 0: command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) @@ -199,17 +196,14 @@ class ActionQueue(threading.Thread): except Queue.Empty: pass - def processStatusCommandResultQueueSafeEmpty(self): - while not self.statusCommandResultQueue.empty(): + def processStatusCommandQueueSafeEmpty(self): + while not self.statusCommandQueue.empty(): try: - result = self.statusCommandResultQueue.get(False) - self.process_status_command_result(result) + command = self.statusCommandQueue.get(False) + self.process_command(command) except Queue.Empty: pass - except IOError: - # on race condition in multiprocessing.Queue if get/put and thread kill are executed at the same time. - # During queue.close IOError will be thrown (this prevents from permanently dead-locked get). - pass + def createCommandHandle(self, command): if command.has_key('__handle'): @@ -230,6 +224,8 @@ class ActionQueue(threading.Thread): finally: if self.controller.recovery_manager.enabled(): self.controller.recovery_manager.stop_execution_command() + elif commandType == self.STATUS_COMMAND: + self.execute_status_command(command) else: logger.error("Unrecognized command " + pprint.pformat(command)) except Exception: @@ -491,12 +487,11 @@ class ActionQueue(threading.Thread): self.commandStatuses.put_command_status(handle.command, roleResult) - def process_status_command_result(self, result): + def execute_status_command(self, command): ''' Executes commands of type STATUS_COMMAND ''' try: - command, component_status_result, component_security_status_result = result cluster = command['clusterName'] service = command['serviceName'] component = command['componentName'] @@ -511,6 +506,11 @@ class ActionQueue(threading.Thread): component_extra = None + # For custom services, responsibility to determine service status is + # delegated to python scripts + component_status_result = self.customServiceOrchestrator.requestComponentStatus(command) + component_security_status_result = self.customServiceOrchestrator.requestComponentSecurityState(command) + if component_status_result['exitcode'] == 0: component_status = LiveStatus.LIVE_STATUS if self.controller.recovery_manager.enabled() \ http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index cece30c..2a4d384 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -39,7 +39,6 @@ import AmbariConfig from ambari_agent.Heartbeat import Heartbeat from ambari_agent.Register import Register from ambari_agent.ActionQueue import ActionQueue -from ambari_agent.StatusCommandsExecutor import StatusCommandsExecutor from ambari_agent.FileCache import FileCache from ambari_agent.NetUtil import NetUtil from ambari_agent.LiveStatus import LiveStatus @@ -84,7 +83,6 @@ class Controller(threading.Thread): self.cachedconnect = None self.range = range self.hasMappedComponents = True - self.statusCommandsExecutor = None # Event is used for synchronizing heartbeat iterations (to make possible # manual wait() interruption between heartbeats ) self.heartbeat_stop_callback = heartbeat_stop_callback @@ -443,18 +441,10 @@ class Controller(threading.Thread): logger.info("Stop event received") self.DEBUG_STOP_HEARTBEATING=True - def spawnStatusCommandsExecutorProcess(self): - self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue) - self.statusCommandsExecutor.start() - - def getStatusCommandsExecutor(self): - return self.statusCommandsExecutor - def run(self): try: self.actionQueue = ActionQueue(self.config, controller=self) self.actionQueue.start() - self.spawnStatusCommandsExecutorProcess() self.register = Register(self.config) self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector()) http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py index b27d7d1..655b2fc 100644 --- a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py @@ -53,9 +53,7 @@ class PythonReflectiveExecutor(PythonExecutor): returncode = 1 try: - current_context = PythonContext(script_dir, pythonCommand) - PythonReflectiveExecutor.last_context = current_context - with current_context: + with PythonContext(script_dir, pythonCommand): imp.load_source('__main__', script) except SystemExit as e: returncode = e.code @@ -78,8 +76,6 @@ class PythonContext: def __init__(self, script_dir, pythonCommand): self.script_dir = script_dir self.pythonCommand = pythonCommand - self.is_reverted = False - self.is_forced_revert = False def __enter__(self): self.old_sys_path = copy.copy(sys.path) @@ -92,18 +88,12 @@ class PythonContext: sys.argv = self.pythonCommand[1:] def __exit__(self, exc_type, exc_val, exc_tb): - self.revert(is_forced_revert=False) + sys.path = self.old_sys_path + sys.argv = self.old_agv + logging.disable(self.old_logging_disable) + self.revert_sys_modules(self.old_sys_modules) return False - def revert(self, is_forced_revert=True): - if not self.is_reverted: - self.is_forced_revert = is_forced_revert - self.is_reverted = True - sys.path = self.old_sys_path - sys.argv = self.old_agv - logging.disable(self.old_logging_disable) - self.revert_sys_modules(self.old_sys_modules) - def revert_sys_modules(self, value): sys.modules.update(value) http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py deleted file mode 100644 index 12f58e5..0000000 --- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py +++ /dev/null @@ -1,90 +0,0 @@ -#!/usr/bin/env python -''' -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' - -import os -import signal -import threading -import logging -import multiprocessing -from ambari_agent.PythonReflectiveExecutor import PythonReflectiveExecutor -from ambari_agent.RemoteDebugUtils import bind_debug_signal_handlers - -logger = logging.getLogger(__name__) - -class StatusCommandsExecutor(multiprocessing.Process): - """ - A process which executes status/security status commands. - - It dies and respawns itself on timeout of the command. Which is the most graceful way to end the currently running status command. - """ - def __init__(self, config, actionQueue): - multiprocessing.Process.__init__(self) - - self.config = config - self.actionQueue = actionQueue - - self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5)) # in seconds - self.hasTimeoutedEvent = multiprocessing.Event() - - def run(self): - try: - bind_debug_signal_handlers() - while True: - command = self.actionQueue.statusCommandQueue.get(True) # blocks until status status command appears - logger.debug("Running status command for {0}".format(command['componentName'])) - - timeout_timer = threading.Timer( self.status_command_timeout, self.respawn, [command]) - timeout_timer.start() - - self.process_status_command(command) - - timeout_timer.cancel() - logger.debug("Completed status command for {0}".format(command['componentName'])) - except: - logger.exception("StatusCommandsExecutor process failed with exception:") - raise - - logger.warn("StatusCommandsExecutor process has finished") - - def process_status_command(self, command): - component_status_result = self.actionQueue.customServiceOrchestrator.requestComponentStatus(command) - component_security_status_result = self.actionQueue.customServiceOrchestrator.requestComponentSecurityState(command) - result = (command, component_status_result, component_security_status_result) - - self.actionQueue.statusCommandResultQueue.put(result) - - def respawn(self, command): - try: - if hasattr(PythonReflectiveExecutor, "last_context"): - # Force context to reset to normal. By context we mean sys.path, imports, etc. They are set by specific status command, and are not relevant to ambari-agent. - PythonReflectiveExecutor.last_context.revert() - - logger.warn("Command {0} for {1} is running for more than {2} seconds. Terminating it due to timeout.".format(command['commandType'], command['componentName'], self.status_command_timeout)) - - self.hasTimeoutedEvent.set() - except: - logger.exception("StatusCommandsExecutor.finish thread failed with exception:") - raise - - def kill(self): - os.kill(self.pid, signal.SIGKILL) - - # prevent queue from ending up with non-freed semaphores, locks during put. Which would result in dead-lock in process executing get. - self.actionQueue.statusCommandResultQueue.close() - self.actionQueue.statusCommandResultQueue.join_thread() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index b1bcae1..cfcd129 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -276,22 +276,6 @@ def reset_agent(options): MAX_RETRIES = 10 -def run_threads(server_hostname, heartbeat_stop_callback): - # Launch Controller communication - controller = Controller(config, server_hostname, heartbeat_stop_callback) - controller.start() - while controller.is_alive(): - time.sleep(0.1) - - if controller.getStatusCommandsExecutor() is not None and (not controller.getStatusCommandsExecutor().is_alive() or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()): - if controller.getStatusCommandsExecutor().is_alive(): - logger.info("Terminating statusCommandsExecutor") - controller.getStatusCommandsExecutor().kill() - logger.info("Respawning statusCommandsExecutor") - controller.spawnStatusCommandsExecutorProcess() - - controller.getStatusCommandsExecutor().kill() - # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process # we need this for windows os, where no sigterm available def main(heartbeat_stop_callback=None): @@ -396,7 +380,10 @@ def main(heartbeat_stop_callback=None): # Set the active server active_server = server_hostname # Launch Controller communication - run_threads(server_hostname, heartbeat_stop_callback) + controller = Controller(config, server_hostname, heartbeat_stop_callback) + controller.start() + while controller.is_alive(): + time.sleep(0.1) # # If Ambari Agent connected to the server or http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index 4a63f7c..7d04d42 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -310,7 +310,9 @@ class TestActionQueue(TestCase): @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch("logging.RootLogger.exception") @patch.object(ActionQueue, "execute_command") - def test_process_command(self, execute_command_mock, log_exc_mock): + @patch.object(ActionQueue, "execute_status_command") + def test_process_command(self, execute_status_command_mock, + execute_command_mock, log_exc_mock): dummy_controller = MagicMock() config = AmbariConfig() config.set('agent', 'tolerate_download_failures', "true") @@ -327,19 +329,29 @@ class TestActionQueue(TestCase): # Try wrong command actionQueue.process_command(wrong_command) self.assertFalse(execute_command_mock.called) + self.assertFalse(execute_status_command_mock.called) self.assertFalse(log_exc_mock.called) execute_command_mock.reset_mock() + execute_status_command_mock.reset_mock() log_exc_mock.reset_mock() # Try normal execution actionQueue.process_command(execution_command) self.assertTrue(execute_command_mock.called) + self.assertFalse(execute_status_command_mock.called) self.assertFalse(log_exc_mock.called) execute_command_mock.reset_mock() + execute_status_command_mock.reset_mock() log_exc_mock.reset_mock() + actionQueue.process_command(status_command) + self.assertFalse(execute_command_mock.called) + self.assertTrue(execute_status_command_mock.called) + self.assertFalse(log_exc_mock.called) + execute_command_mock.reset_mock() + execute_status_command_mock.reset_mock() log_exc_mock.reset_mock() # Try exception to check proper logging @@ -351,6 +363,7 @@ class TestActionQueue(TestCase): log_exc_mock.reset_mock() + execute_status_command_mock.side_effect = side_effect actionQueue.process_command(execution_command) self.assertTrue(log_exc_mock.called) @@ -822,11 +835,14 @@ class TestActionQueue(TestCase): @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(ActionQueue, "status_update_callback") + @patch.object(CustomServiceOrchestrator, "requestComponentStatus") + @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState") @patch.object(ActionQueue, "execute_command") @patch.object(LiveStatus, "build") @patch.object(CustomServiceOrchestrator, "__init__") def test_execute_status_command(self, CustomServiceOrchestrator_mock, - build_mock, execute_command_mock, + build_mock, execute_command_mock, requestComponentSecurityState_mock, + requestComponentStatus_mock, status_update_callback): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() @@ -836,25 +852,33 @@ class TestActionQueue(TestCase): dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp()) - result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN') + requestComponentStatus_mock.reset_mock() + requestComponentStatus_mock.return_value = {'exitcode': 0 } - actionQueue.process_status_command_result(result) + requestComponentSecurityState_mock.reset_mock() + requestComponentSecurityState_mock.return_value = 'UNKNOWN' + + actionQueue.execute_status_command(self.status_command) report = actionQueue.result() expected = {'dummy report': '', 'securityState' : 'UNKNOWN'} self.assertEqual(len(report['componentStatus']), 1) self.assertEqual(report['componentStatus'][0], expected) + self.assertTrue(requestComponentStatus_mock.called) @patch.object(RecoveryManager, "command_exists") @patch.object(RecoveryManager, "requires_recovery") @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(ActionQueue, "status_update_callback") + @patch.object(CustomServiceOrchestrator, "requestComponentStatus") + @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState") @patch.object(ActionQueue, "execute_command") @patch.object(LiveStatus, "build") @patch.object(CustomServiceOrchestrator, "__init__") - def test_process_status_command_result_recovery(self, CustomServiceOrchestrator_mock, - build_mock, execute_command_mock, + def test_execute_status_command_recovery(self, CustomServiceOrchestrator_mock, + build_mock, execute_command_mock, requestComponentSecurityState_mock, + requestComponentStatus_mock, status_update_callback, requires_recovery_mock, command_exists_mock): CustomServiceOrchestrator_mock.return_value = None @@ -867,9 +891,13 @@ class TestActionQueue(TestCase): dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp(), True, False) - result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN') + requestComponentStatus_mock.reset_mock() + requestComponentStatus_mock.return_value = {'exitcode': 0 } + + requestComponentSecurityState_mock.reset_mock() + requestComponentSecurityState_mock.return_value = 'UNKNOWN' - actionQueue.process_status_command_result(result) + actionQueue.execute_status_command(self.status_command) report = actionQueue.result() expected = {'dummy report': '', 'securityState' : 'UNKNOWN', @@ -877,13 +905,17 @@ class TestActionQueue(TestCase): self.assertEqual(len(report['componentStatus']), 1) self.assertEqual(report['componentStatus'][0], expected) + self.assertTrue(requestComponentStatus_mock.called) requires_recovery_mock.return_value = True command_exists_mock.return_value = True - - result = (self.status_command, {'exitcode': 0 }, 'UNKNOWN') + requestComponentStatus_mock.reset_mock() + requestComponentStatus_mock.return_value = {'exitcode': 0 } + + requestComponentSecurityState_mock.reset_mock() + requestComponentSecurityState_mock.return_value = 'UNKNOWN' - actionQueue.process_status_command_result(result) + actionQueue.execute_status_command(self.status_command) report = actionQueue.result() expected = {'dummy report': '', 'securityState' : 'UNKNOWN', @@ -891,33 +923,39 @@ class TestActionQueue(TestCase): self.assertEqual(len(report['componentStatus']), 1) self.assertEqual(report['componentStatus'][0], expected) + self.assertTrue(requestComponentStatus_mock.called) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(ActionQueue, "status_update_callback") + @patch.object(CustomServiceOrchestrator, "requestComponentStatus") + @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState") @patch.object(ActionQueue, "execute_command") @patch.object(LiveStatus, "build") @patch.object(CustomServiceOrchestrator, "__init__") - def test_process_status_command_result_with_alerts(self, CustomServiceOrchestrator_mock, + def test_execute_status_command_with_alerts(self, CustomServiceOrchestrator_mock, + requestComponentSecurityState_mock, build_mock, execute_command_mock, + requestComponentStatus_mock, status_update_callback): CustomServiceOrchestrator_mock.return_value = None dummy_controller = MagicMock() actionQueue = ActionQueue(AmbariConfig(), dummy_controller) - command_return_value = { + + + requestComponentStatus_mock.reset_mock() + requestComponentStatus_mock.return_value = { 'exitcode': 0, 'stdout': 'out', 'stderr': 'err', 'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] } } - - result = (self.status_command_for_alerts, command_return_value, command_return_value) - build_mock.return_value = {'somestatusresult': 'aresult'} - actionQueue.process_status_command_result(result) + actionQueue.execute_status_command(self.status_command_for_alerts) report = actionQueue.result() + self.assertTrue(requestComponentStatus_mock.called) self.assertEqual(len(report['componentStatus']), 1) self.assertTrue(report['componentStatus'][0].has_key('alerts')) @@ -1177,7 +1215,7 @@ class TestActionQueue(TestCase): execute_command = copy.deepcopy(self.background_command) actionQueue.put([execute_command]) actionQueue.processBackgroundQueueSafeEmpty(); - actionQueue.processStatusCommandResultQueueSafeEmpty(); + actionQueue.processStatusCommandQueueSafeEmpty(); #assert that python execturor start self.assertTrue(runCommand_mock.called) @@ -1221,7 +1259,7 @@ class TestActionQueue(TestCase): None, command_complete_w) actionQueue.put([self.background_command]) actionQueue.processBackgroundQueueSafeEmpty(); - actionQueue.processStatusCommandResultQueueSafeEmpty(); + actionQueue.processStatusCommandQueueSafeEmpty(); with lock: complete_done.wait(0.1) http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/test/python/ambari_agent/TestController.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py index b47af03..59b41cd 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestController.py +++ b/ambari-agent/src/test/python/ambari_agent/TestController.py @@ -44,7 +44,6 @@ import ambari_commons @not_for_platform(PLATFORM_WINDOWS) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) -@patch.object(Controller.Controller, "spawnStatusCommandsExecutorProcess", new = MagicMock()) class TestController(unittest.TestCase): logger = logging.getLogger() http://git-wip-us.apache.org/repos/asf/ambari/blob/b9e7df0e/ambari-agent/src/test/python/ambari_agent/TestMain.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestMain.py b/ambari-agent/src/test/python/ambari_agent/TestMain.py index 998b778..400241f 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestMain.py +++ b/ambari-agent/src/test/python/ambari_agent/TestMain.py @@ -324,7 +324,6 @@ class TestMain(unittest.TestCase): @patch.object(Controller, "__init__") @patch.object(Controller, "is_alive") @patch.object(Controller, "start") - @patch.object(Controller, "getStatusCommandsExecutor") @patch("optparse.OptionParser.parse_args") @patch.object(DataCleaner,"start") @patch.object(DataCleaner,"__init__") @@ -333,7 +332,7 @@ class TestMain(unittest.TestCase): @patch.object(ExitHelper,"execute_cleanup") @patch.object(ExitHelper, "exit") def test_main(self, exithelper_exit_mock, cleanup_mock, ping_port_init_mock, ping_port_start_mock, data_clean_init_mock,data_clean_start_mock, - parse_args_mock, start_mock, Controller_getStatusCommandsExecutor, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock, + parse_args_mock, start_mock, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock, update_log_level_mock, daemonize_mock, perform_prestart_checks_mock, ambari_config_mock, stop_mock, bind_signal_handlers_mock,