Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A8059200C0F for ; Wed, 18 Jan 2017 09:42:44 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A6B1B160B44; Wed, 18 Jan 2017 08:42:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B2528160B56 for ; Wed, 18 Jan 2017 09:42:43 +0100 (CET) Received: (qmail 27907 invoked by uid 500); 18 Jan 2017 08:42:42 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 27749 invoked by uid 99); 18 Jan 2017 08:42:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jan 2017 08:42:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A230CDFD7D; Wed, 18 Jan 2017 08:42:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stoader@apache.org To: commits@ambari.apache.org Date: Wed, 18 Jan 2017 08:42:46 -0000 Message-Id: In-Reply-To: <4923a60921aa479dbe9482f3ce7178d7@git.apache.org> References: <4923a60921aa479dbe9482f3ce7178d7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/5] ambari git commit: AMBARI-19520. Ambari agents not recovering from heart beat lost state immediately after successful re-registering with server. (stoader) archived-at: Wed, 18 Jan 2017 08:42:44 -0000 AMBARI-19520. Ambari agents not recovering from heart beat lost state immediately after successful re-registering with server. (stoader) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b512b26a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b512b26a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b512b26a Branch: refs/heads/branch-2.5 Commit: b512b26ae48c92df0b8d884c08f5f07cf9a2875b Parents: 36f7422 Author: Toader, Sebastian Authored: Mon Jan 16 13:43:01 2017 +0100 Committer: Toader, Sebastian Committed: Wed Jan 18 09:41:56 2017 +0100 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/Controller.py | 63 ++++++++++++++------ .../src/main/python/ambari_agent/main.py | 18 +++--- 2 files changed, 53 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/b512b26a/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 2244d30..09ab1e6 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -86,6 +86,10 @@ class Controller(threading.Thread): self.max_reconnect_retry_delay = int(config.get('server','max_reconnect_retry_delay', default=30)) self.hasMappedComponents = True self.statusCommandsExecutor = None + + # this lock is used control which thread spawns/kills the StatusCommandExecutor child process + self.spawnKillStatusCommandExecutorLock = threading.RLock() + # Event is used for synchronizing heartbeat iterations (to make possible # manual wait() interruption between heartbeats ) self.heartbeat_stop_callback = heartbeat_stop_callback @@ -199,11 +203,9 @@ class Controller(threading.Thread): self.config.update_configuration_from_registration(ret) logger.debug("Updated config:" + str(self.config)) - if self.statusCommandsExecutor is None: - self.spawnStatusCommandsExecutorProcess() - elif self.statusCommandsExecutor.is_alive(): - logger.info("Terminating statusCommandsExecutor as agent re-registered with server.") - self.killStatusCommandsExecutorProcess() + # Start StatusCommandExecutor child process or restart it if already running + # in order to receive up to date agent config. + self.spawnStatusCommandsExecutorProcess() if 'statusCommands' in ret.keys(): logger.debug("Got status commands on registration.") @@ -458,22 +460,43 @@ class Controller(threading.Thread): self.DEBUG_STOP_HEARTBEATING=True def spawnStatusCommandsExecutorProcess(self): - # Re-create the status command queue as in case the consumer - # process is killed the queue may deadlock (see http://bugs.python.org/issue20527). - # The queue must be re-created by the producer process. - if self.actionQueue.statusCommandQueue is not None: - self.actionQueue.statusCommandQueue.close() - self.actionQueue.statusCommandQueue.join_thread() - - self.actionQueue.statusCommandQueue = multiprocessing.Queue() - - self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue) - self.statusCommandsExecutor.start() + ''' + Starts a new StatusCommandExecutor child process. In case there is a running instance + already restarts it by simply killing it and starting new one. + This function is thread-safe. + ''' + with self.getSpawnKillStatusCommandExecutorLock(): + # if there is already an instance of StatusCommandExecutor kill it first + self.killStatusCommandsExecutorProcess() + + # Re-create the status command queue as in case the consumer + # process is killed the queue may deadlock (see http://bugs.python.org/issue20527). + # The queue must be re-created by the producer process. + statusCommandQueue = self.actionQueue.statusCommandQueue + self.actionQueue.statusCommandQueue = multiprocessing.Queue() + + if statusCommandQueue is not None: + statusCommandQueue.close() + + logger.info("Spawning statusCommandsExecutor") + self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue) + self.statusCommandsExecutor.start() def killStatusCommandsExecutorProcess(self): - self.statusCommandsExecutor.kill() - - + ''' + Kills the StatusExecutorChild process if exists. This function is thread-safe. + ''' + with self.getSpawnKillStatusCommandExecutorLock(): + if self.statusCommandsExecutor is not None and self.statusCommandsExecutor.is_alive(): + logger.info("Terminating statusCommandsExecutor.") + self.statusCommandsExecutor.kill() + + def getSpawnKillStatusCommandExecutorLock(self): + ''' + Re-entrant lock to be used to synchronize the spawning or killing of + StatusCommandExecutor child process in multi-thread environment. + ''' + return self.spawnKillStatusCommandExecutorLock; def getStatusCommandsExecutor(self): return self.statusCommandsExecutor @@ -586,6 +609,8 @@ class Controller(threading.Thread): except Exception, e: logger.info("Exception in move_data_dir_mount_file(). Error: {0}".format(str(e))) + + def main(argv=None): # Allow Ctrl-C http://git-wip-us.apache.org/repos/asf/ambari/blob/b512b26a/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index 2e1124e..8e577a5 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -291,15 +291,15 @@ def run_threads(server_hostname, heartbeat_stop_callback): while controller.is_alive(): time.sleep(0.1) - if controller.getStatusCommandsExecutor() is not None and (not controller.getStatusCommandsExecutor().is_alive() or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()): - if controller.getStatusCommandsExecutor().is_alive(): - logger.info("Terminating statusCommandsExecutor") - controller.killStatusCommandsExecutorProcess() - logger.info("Respawning statusCommandsExecutor") - controller.spawnStatusCommandsExecutorProcess() - - if controller.getStatusCommandsExecutor() is not None and controller.getStatusCommandsExecutor().is_alive(): - controller.killStatusCommandsExecutorProcess() + with controller.getSpawnKillStatusCommandExecutorLock(): + # We need to lock as Controller.py may try to spawn StatusCommandExecutor child in parallel as well + if controller.getStatusCommandsExecutor() is not None \ + and (not controller.getStatusCommandsExecutor().is_alive() + or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()): + controller.spawnStatusCommandsExecutorProcess() + + + controller.killStatusCommandsExecutorProcess() # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process # we need this for windows os, where no sigterm available