ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echekans...@apache.org
Subject ambari git commit: AMBARI-20323. Commands timed-out on ambari host without any error logs - addendum patch (echekanskiy)
Date Fri, 10 Mar 2017 22:13:39 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk 0471b0c37 -> b69ac43a6


AMBARI-20323. Commands timed-out on ambari host without any error logs - addendum patch (echekanskiy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b69ac43a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b69ac43a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b69ac43a

Branch: refs/heads/trunk
Commit: b69ac43a6a7aa5c7810bca0bc1204e6641634c35
Parents: 0471b0c
Author: Eugene Chekanskiy <echekanskiy@hortonworks.com>
Authored: Sat Mar 11 00:08:27 2017 +0200
Committer: Eugene Chekanskiy <echekanskiy@hortonworks.com>
Committed: Sat Mar 11 00:08:27 2017 +0200

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/Controller.py  |  2 +-
 .../src/main/python/ambari_agent/ExitHelper.py  |  3 ++
 .../ambari_agent/StatusCommandsExecutor.py      | 36 ++++++++++++++++----
 .../src/main/python/ambari_agent/main.py        |  4 +--
 4 files changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b69ac43a/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index c152f64..c1a5f1b 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -477,7 +477,7 @@ class Controller(threading.Thread):
     try:
       self.actionQueue = ActionQueue(self.config, controller=self)
       self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue)
-      ExitHelper().register(self.statusCommandsExecutor.kill, "CLEANUP_KILLING")
+      ExitHelper().register(self.statusCommandsExecutor.kill, "CLEANUP_KILLING", can_relaunch=False)
       self.actionQueue.start()
       self.register = Register(self.config)
       self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())

http://git-wip-us.apache.org/repos/asf/ambari/blob/b69ac43a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py b/ambari-agent/src/main/python/ambari_agent/ExitHelper.py
index e51646f..66e29e6 100644
--- a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py
+++ b/ambari-agent/src/main/python/ambari_agent/ExitHelper.py
@@ -39,6 +39,9 @@ class ExitHelper(object):
   """
   Class to cleanup resources before exiting. Replacement for atexit module. sys.exit(code)
works only from threads and
   os._exit(code) will ignore atexit and cleanup will be ignored.
+
+  WARNING: always import as `ambari_agent.ExitHelper import ExitHelper`, otherwise it will
be imported twice and nothing
+  will work as expected.
   """
   __metaclass__ = _singleton
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/b69ac43a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
index 3f7ef4c..5c1c54a 100644
--- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
@@ -37,6 +37,9 @@ class StatusCommandsExecutor(object):
     self.config = config
     self.actionQueue = actionQueue
 
+    self._can_relaunch_lock = threading.RLock()
+    self._can_relaunch = True
+
     # used to prevent queues from been used during creation of new one to prevent threads
messing up with combination of
     # old and new queues
     self.usage_lock = threading.RLock()
@@ -53,6 +56,16 @@ class StatusCommandsExecutor(object):
     self.mp_result_logs = multiprocessing.Queue()
     self.mp_task_queue = multiprocessing.Queue()
 
+  @property
+  def can_relaunch(self):
+    with self._can_relaunch_lock:
+      return self._can_relaunch
+
+  @can_relaunch.setter
+  def can_relaunch(self, value):
+    with self._can_relaunch_lock:
+      self._can_relaunch = value
+
   def _log_message(self, level, message, exception=None):
     """
     Put log message to logging queue. Must be used only for logging from child process(in
_worker_process_target).
@@ -163,7 +176,7 @@ class StatusCommandsExecutor(object):
       self._log_message(logging.ERROR, "StatusCommandsExecutor process failed with exception:",
e)
       raise
 
-    self._log_message(logging.WARN, "StatusCommandsExecutor subprocess finished")
+    self._log_message(logging.INFO, "StatusCommandsExecutor subprocess finished")
 
   def _set_timed_out(self, command):
     """
@@ -242,23 +255,32 @@ class StatusCommandsExecutor(object):
     :param reason: reason of restart
     :return:
     """
-    self.kill(reason)
-    self.worker_process = multiprocessing.Process(target=self._worker_process_target)
-    self.worker_process.start()
-    logger.info("Started process with pid {0}".format(self.worker_process.pid))
+    if self.can_relaunch:
+      self.kill(reason)
+      self.worker_process = multiprocessing.Process(target=self._worker_process_target)
+      self.worker_process.start()
+      logger.info("Started process with pid {0}".format(self.worker_process.pid))
+    else:
+      logger.debug("Relaunch does not allowed, can not relaunch")
 
-  def kill(self, reason=None):
+  def kill(self, reason=None, can_relaunch=True):
     """
     Tries to stop command executor internal process for sort time, otherwise killing it.
Closing all possible queues to
     unblock threads that probably blocked on read or write operations to queues. Must be
called from threads different
     from threads that calling read or write methods(get_log_messages, get_results, put_commands).
 
+    :param can_relaunch: indicates if StatusCommandsExecutor can be relaunched after this
kill
     :param reason: reason of killing
     :return:
     """
+    logger.info("Killing child process reason:" + str(reason))
+    self.can_relaunch = can_relaunch
+
+    if not self.can_relaunch:
+      logger.info("Killing without possibility to relaunch...")
+
     # try graceful stop, otherwise hard-kill
     if self.worker_process and self.worker_process.is_alive():
-      logger.info("Killing child process reason:" + str(reason))
       self.mustDieEvent.set()
       self.worker_process.join(timeout=3)
       if self.worker_process.is_alive():

http://git-wip-us.apache.org/repos/asf/ambari/blob/b69ac43a/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index ade9e4f..ddef473 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -89,7 +89,7 @@ from NetUtil import NetUtil
 from PingPortListener import PingPortListener
 import hostname
 from DataCleaner import DataCleaner
-from ExitHelper import ExitHelper
+from ambari_agent.ExitHelper import ExitHelper
 import socket
 from ambari_commons import OSConst, OSCheck
 from ambari_commons.shell import shellRunner
@@ -336,7 +336,7 @@ def run_threads(server_hostname, heartbeat_stop_callback):
     if controller.get_status_commands_executor().need_relaunch:
       controller.get_status_commands_executor().relaunch("COMMAND_TIMEOUT_OR_KILLED")
 
-  controller.get_status_commands_executor().kill()
+  controller.get_status_commands_executor().kill("AGENT_STOPPED", can_relaunch=False)
 
 # event - event, that will be passed to Controller and NetUtil to make able to interrupt
loops form outside process
 # we need this for windows os, where no sigterm available


Mime
View raw message