ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nc...@apache.org
Subject [24/50] ambari git commit: AMBARI-20419. Add a property to enable Status commands on separate process (non-default) (aonishuk)
Date Tue, 14 Mar 2017 18:42:08 GMT
AMBARI-20419. Add a property to enable Status commands on separate process (non-default) (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/938fed1f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/938fed1f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/938fed1f

Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 938fed1f3b2b96ae8da174f9bd2133ca064197af
Parents: 73a8633
Author: Andrew Onishuk <aonishuk@hortonworks.com>
Authored: Mon Mar 13 16:32:30 2017 +0200
Committer: Andrew Onishuk <aonishuk@hortonworks.com>
Committed: Mon Mar 13 16:32:30 2017 +0200

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py | 16 ++---
 .../main/python/ambari_agent/AmbariConfig.py    |  3 +
 .../src/main/python/ambari_agent/Controller.py  |  7 +-
 .../ambari_agent/StatusCommandsExecutor.py      | 75 +++++++++++++++++---
 .../test/python/ambari_agent/TestActionQueue.py |  4 +-
 5 files changed, 83 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/938fed1f/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 15ae03d..b1d5160 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -146,7 +146,7 @@ class ActionQueue(threading.Thread):
     try:
       while not self.stopped():
         self.processBackgroundQueueSafeEmpty()
-        self.process_status_command_results()
+        self.controller.get_status_commands_executor().process_results() # process status
commands
         try:
           if self.parallel_execution == 0:
             command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
@@ -190,14 +190,6 @@ class ActionQueue(threading.Thread):
       except Queue.Empty:
         pass
 
-  def process_status_command_results(self):
-    self.controller.statusCommandsExecutor.process_logs()
-    for result in self.controller.statusCommandsExecutor.get_results():
-      try:
-        self.process_status_command_result(result)
-      except UnicodeDecodeError:
-        pass
-
   def createCommandHandle(self, command):
     if command.has_key('__handle'):
       raise AgentException("Command already has __handle")
@@ -503,6 +495,12 @@ class ActionQueue(threading.Thread):
 
     self.commandStatuses.put_command_status(handle.command, roleResult)
 
+  def execute_status_command_and_security_status(self, command):
+    component_status_result = self.customServiceOrchestrator.requestComponentStatus(command)
+    component_security_status_result = self.customServiceOrchestrator.requestComponentSecurityState(command)
+
+    return command, component_status_result, component_security_status_result
+
   def process_status_command_result(self, result):
     '''
     Executes commands of type STATUS_COMMAND

http://git-wip-us.apache.org/repos/asf/ambari/blob/938fed1f/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index 64c2643..1965dc2 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -304,6 +304,9 @@ class AmbariConfig:
     self.set('agent', self.ULIMIT_OPEN_FILES_KEY, value)
 
 
+  def get_multiprocess_status_commands_executor_enabled(self):
+    return bool(int(self.get('agent', 'multiprocess_status_commands_executor_enabled', 1)))
+
   def update_configuration_from_registration(self, reg_resp):
     if reg_resp and AmbariConfig.AMBARI_PROPERTIES_CATEGORY in reg_resp:
       if not self.has_section(AmbariConfig.AMBARI_PROPERTIES_CATEGORY):

http://git-wip-us.apache.org/repos/asf/ambari/blob/938fed1f/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index c1a5f1b..29a11aa 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -48,7 +48,7 @@ from ambari_agent.ClusterConfiguration import  ClusterConfiguration
 from ambari_agent.RecoveryManager import  RecoveryManager
 from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers
 from ambari_agent.ExitHelper import ExitHelper
-from ambari_agent.StatusCommandsExecutor import StatusCommandsExecutor
+from ambari_agent.StatusCommandsExecutor import MultiProcessStatusCommandsExecutor, SingleProcessStatusCommandsExecutor
 from resource_management.libraries.functions.version import compare_versions
 from ambari_commons.os_utils import get_used_ram
 
@@ -476,7 +476,10 @@ class Controller(threading.Thread):
   def run(self):
     try:
       self.actionQueue = ActionQueue(self.config, controller=self)
-      self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue)
+      if self.config.get_multiprocess_status_commands_executor_enabled():
+        self.statusCommandsExecutor = MultiProcessStatusCommandsExecutor(self.config, self.actionQueue)
+      else:
+        self.statusCommandsExecutor = SingleProcessStatusCommandsExecutor(self.config, self.actionQueue)
       ExitHelper().register(self.statusCommandsExecutor.kill, "CLEANUP_KILLING", can_relaunch=False)
       self.actionQueue.start()
       self.register = Register(self.config)

http://git-wip-us.apache.org/repos/asf/ambari/blob/938fed1f/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
index 5c1c54a..3b23f1c 100644
--- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
@@ -31,8 +31,56 @@ from ambari_agent.RemoteDebugUtils import bind_debug_signal_handlers
 
 logger = logging.getLogger(__name__)
 
-
 class StatusCommandsExecutor(object):
+  def put_commands(self, commands):
+    raise NotImplemented()
+
+  def process_results(self):
+    raise NotImplemented()
+
+  def relaunch(self, reason=None):
+    raise NotImplemented()
+
+  def kill(self, reason=None, can_relaunch=True):
+    raise NotImplemented()
+
+class SingleProcessStatusCommandsExecutor(StatusCommandsExecutor):
+  def __init__(self, config, actionQueue):
+    self.config = config
+    self.actionQueue = actionQueue
+    self.statusCommandQueue = Queue.Queue()
+    self.need_relaunch = False
+
+  def put_commands(self, commands):
+    while not self.statusCommandQueue.empty():
+      self.statusCommandQueue.get()
+
+    for command in commands:
+      logger.info("Adding " + command['commandType'] + " for component " + \
+                  command['componentName'] + " of service " + \
+                  command['serviceName'] + " of cluster " + \
+                  command['clusterName'] + " to the queue.")
+      self.statusCommandQueue.put(command)
+      logger.debug(pprint.pformat(command))
+
+  def process_results(self):
+    """
+    Execute a single command from the queue and process it
+    """
+    while not self.statusCommandQueue.empty():
+      try:
+        command = self.statusCommandQueue.get(False)
+        self.actionQueue.process_status_command_result(self.actionQueue.execute_status_command_and_security_status(command))
+      except Queue.Empty:
+        pass
+
+  def relaunch(self, reason=None):
+    pass
+
+  def kill(self, reason=None, can_relaunch=True):
+    pass
+
+class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
   def __init__(self, config, actionQueue):
     self.config = config
     self.actionQueue = actionQueue
@@ -78,7 +126,7 @@ class StatusCommandsExecutor(object):
     result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message
     self.mp_result_logs.put((level, result_message, exception))
 
-  def get_log_messages(self):
+  def _get_log_messages(self):
     """
     Returns list of (level, message, exception) log messages.
 
@@ -100,11 +148,11 @@ class StatusCommandsExecutor(object):
         pass
     return results
 
-  def process_logs(self):
+  def _process_logs(self):
     """
     Get all available at this moment logs and prints them to logger.
     """
-    for level, message, exception in self.get_log_messages():
+    for level, message, exception in self._get_log_messages():
       if level == logging.ERROR:
         logger.debug(message, exc_info=exception)
       if level == logging.WARN:
@@ -129,9 +177,7 @@ class StatusCommandsExecutor(object):
       """
       while True:
         _cmd = internal_in_queue.get()
-        component_status_result = self.customServiceOrchestrator.requestComponentStatus(_cmd)
-        component_security_status_result = self.customServiceOrchestrator.requestComponentSecurityState(_cmd)
-        internal_out_queue.put((_cmd, component_status_result, component_security_status_result))
+        internal_out_queue.put(self.actionQueue.execute_status_command_and_security_status(_cmd))
 
     worker = threading.Thread(target=_internal_worker)
     worker.daemon = True
@@ -219,7 +265,18 @@ class StatusCommandsExecutor(object):
         self.mp_task_queue.put(command)
         logger.debug(pprint.pformat(command))
 
-  def get_results(self):
+  def process_results(self):
+    """
+    Process all the results from the internal worker
+    """
+    self._process_logs()
+    for result in self._get_results():
+      try:
+        self.actionQueue.process_status_command_result(result)
+      except UnicodeDecodeError:
+        pass
+
+  def _get_results(self):
     """
     Get all available results for status commands.
 
@@ -289,7 +346,7 @@ class StatusCommandsExecutor(object):
       else:
         # get log messages only if we died gracefully, otherwise we will have chance to block
here forever, in most cases
         # this call will do nothing, as all logs will be processed in ActionQueue loop
-        self.process_logs()
+        self._process_logs()
         logger.info("Child process died gracefully")
     else:
       logger.info("Child process already dead")

http://git-wip-us.apache.org/repos/asf/ambari/blob/938fed1f/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 8701a24..67f0833 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -1329,7 +1329,7 @@ class TestActionQueue(TestCase):
     execute_command = copy.deepcopy(self.background_command)
     actionQueue.put([execute_command])
     actionQueue.processBackgroundQueueSafeEmpty();
-    actionQueue.process_status_command_results();
+    actionQueue.controller.statusCommandExecutor.process_results();
     
     #assert that python execturor start
     self.assertTrue(runCommand_mock.called)
@@ -1373,7 +1373,7 @@ class TestActionQueue(TestCase):
                                                                  None, command_complete_w)
     actionQueue.put([self.background_command])
     actionQueue.processBackgroundQueueSafeEmpty();
-    actionQueue.process_status_command_results();
+    actionQueue.controller.statusCommandExecutor.process_results();
     
     with lock:
       complete_done.wait(0.1)


Mime
View raw message