Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2F9F7200BBD for ; Mon, 3 Oct 2016 20:21:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2D6D8160ADC; Mon, 3 Oct 2016 18:21:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A8A25160AF8 for ; Mon, 3 Oct 2016 20:21:45 +0200 (CEST) Received: (qmail 34103 invoked by uid 500); 3 Oct 2016 18:21:44 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 33189 invoked by uid 99); 3 Oct 2016 18:21:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Oct 2016 18:21:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 266BADFCF2; Mon, 3 Oct 2016 18:21:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jonathanhurley@apache.org To: commits@ambari.apache.org Date: Mon, 03 Oct 2016 18:21:55 -0000 Message-Id: <0b108c9534ac4922b9e5fa4bb8e5ebc3@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [13/18] ambari git commit: AMBARI-18505. Ambari Status commands should enforce a timeout < heartbeat interval (aonishuk) archived-at: Mon, 03 Oct 2016 18:21:47 -0000 AMBARI-18505. Ambari Status commands should enforce a timeout < heartbeat interval (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dfa16136 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dfa16136 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dfa16136 Branch: refs/heads/branch-feature-AMBARI-18456 Commit: dfa16136d0ceaba1e2fdb67fbc4d9dc3e3ec49f5 Parents: 324107d Author: Andrew Onishuk Authored: Mon Oct 3 12:28:46 2016 +0300 Committer: Andrew Onishuk Committed: Mon Oct 3 12:28:46 2016 +0300 ---------------------------------------------------------------------- ambari-agent/conf/unix/ambari-agent.ini | 1 + .../src/main/python/ambari_agent/ActionQueue.py | 22 ++++++++++++++++- .../ambari_agent/PythonReflectiveExecutor.py | 25 +++++++++++++++----- .../test/python/ambari_agent/TestActionQueue.py | 3 ++- 4 files changed, 43 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa16136/ambari-agent/conf/unix/ambari-agent.ini ---------------------------------------------------------------------- diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini index 914e09a..1c39c24 100644 --- a/ambari-agent/conf/unix/ambari-agent.ini +++ b/ambari-agent/conf/unix/ambari-agent.ini @@ -32,6 +32,7 @@ tolerate_download_failures=true run_as_user=root parallel_execution=0 alert_grace_period=5 +status_command_timeout=2 alert_kinit_timeout=14400000 system_resource_overrides=/etc/resource_overrides ; memory_threshold_soft_mb=400 http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa16136/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index f104939..86918e5 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -29,6 +29,7 @@ import time import signal from AgentException import AgentException +from PythonReflectiveExecutor import PythonReflectiveExecutor from LiveStatus import LiveStatus from ActualConfigHandler import ActualConfigHandler from CommandStatusDict import CommandStatusDict @@ -82,9 +83,11 @@ class ActionQueue(threading.Thread): self.controller = controller self.configTags = {} self._stop = threading.Event() + self.hangingStatusCommands = {} self.tmpdir = config.get('agent', 'prefix') self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller) self.parallel_execution = config.get_parallel_exec_option() + self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5)) if self.parallel_execution == 1: logger.info("Parallel execution is enabled, will execute agent commands in parallel") @@ -225,7 +228,24 @@ class ActionQueue(threading.Thread): if self.controller.recovery_manager.enabled(): self.controller.recovery_manager.stop_execution_command() elif commandType == self.STATUS_COMMAND: - self.execute_status_command(command) + component_name = command['componentName'] + + if component_name in self.hangingStatusCommands and not self.hangingStatusCommands[component_name].isAlive(): + del self.hangingStatusCommands[component_name] + + if not component_name in self.hangingStatusCommands: + thread = threading.Thread(target = self.execute_status_command, args = (command,)) + thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping + thread.start() + thread.join(timeout=self.status_command_timeout) + + if thread.isAlive(): + # Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent. + PythonReflectiveExecutor.last_context.revert() + logger.warn("Command {0} for {1} is running for more than {2} seconds. Skipping it for current pack of status commands.".format(commandType, component_name, self.status_command_timeout)) + self.hangingStatusCommands[component_name] = thread + else: + logger.info("Not running {0} for {1}, because previous one is still running.".format(commandType, component_name)) else: logger.error("Unrecognized command " + pprint.pformat(command)) except Exception: http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa16136/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py index 655b2fc..b476671 100644 --- a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py @@ -53,7 +53,9 @@ class PythonReflectiveExecutor(PythonExecutor): returncode = 1 try: - with PythonContext(script_dir, pythonCommand): + current_context = PythonContext(script_dir, pythonCommand) + PythonReflectiveExecutor.last_context = current_context + with current_context: imp.load_source('__main__', script) except SystemExit as e: returncode = e.code @@ -62,7 +64,10 @@ class PythonReflectiveExecutor(PythonExecutor): except (ClientComponentHasNoStatus, ComponentIsNotRunning): logger.debug("Reflective command failed with exception:", exc_info=1) except Exception: - logger.info("Reflective command failed with exception:", exc_info=1) + if current_context.is_forced_revert: + logger.info("Hanging status command finished its execution") + else: + logger.info("Reflective command failed with exception:", exc_info=1) else: returncode = 0 @@ -76,6 +81,8 @@ class PythonContext: def __init__(self, script_dir, pythonCommand): self.script_dir = script_dir self.pythonCommand = pythonCommand + self.is_reverted = False + self.is_forced_revert = False def __enter__(self): self.old_sys_path = copy.copy(sys.path) @@ -88,12 +95,18 @@ class PythonContext: sys.argv = self.pythonCommand[1:] def __exit__(self, exc_type, exc_val, exc_tb): - sys.path = self.old_sys_path - sys.argv = self.old_agv - logging.disable(self.old_logging_disable) - self.revert_sys_modules(self.old_sys_modules) + self.revert(is_forced_revert=False) return False + def revert(self, is_forced_revert=True): + if not self.is_reverted: + self.is_forced_revert = is_forced_revert + self.is_reverted = True + sys.path = self.old_sys_path + sys.argv = self.old_agv + logging.disable(self.old_logging_disable) + self.revert_sys_modules(self.old_sys_modules) + def revert_sys_modules(self, value): sys.modules.update(value) http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa16136/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index 7d04d42..32773b8 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -225,6 +225,7 @@ class TestActionQueue(TestCase): retryable_command = { 'commandType': 'EXECUTION_COMMAND', 'role': 'NAMENODE', + 'componentName': 'NAMENODE', 'roleCommand': 'INSTALL', 'commandId': '1-1', 'taskId': 19, @@ -322,6 +323,7 @@ class TestActionQueue(TestCase): } status_command = { 'commandType' : ActionQueue.STATUS_COMMAND, + 'componentName': 'NAMENODE' } wrong_command = { 'commandType' : "SOME_WRONG_COMMAND", @@ -1126,7 +1128,6 @@ class TestActionQueue(TestCase): self.assertTrue(runCommand_mock.called) self.assertEqual(2, runCommand_mock.call_count) self.assertEqual(1, sleep_mock.call_count) - sleep_mock.assert_has_calls([call(1)], False) runCommand_mock.assert_has_calls([ call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt', os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),