ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smoha...@apache.org
Subject ambari git commit: Ambari-8189. Ambari agent support for parallel task execution during deployment (Ivan Mitic via smohanty)
Date Wed, 22 Apr 2015 21:25:32 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk 9639005f6 -> e4976458f


Ambari-8189. Ambari agent support for parallel task execution during deployment (Ivan Mitic
via smohanty)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e4976458
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e4976458
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e4976458

Branch: refs/heads/trunk
Commit: e4976458fcbefb92f0192a044176356b2637a8af
Parents: 9639005
Author: Sumit Mohanty <smohanty@hortonworks.com>
Authored: Wed Apr 22 14:22:16 2015 -0700
Committer: Sumit Mohanty <smohanty@hortonworks.com>
Committed: Wed Apr 22 14:22:16 2015 -0700

----------------------------------------------------------------------
 ambari-agent/conf/unix/ambari-agent.ini         |  1 +
 ambari-agent/conf/windows/ambari-agent.ini      |  1 +
 .../src/main/python/ambari_agent/ActionQueue.py | 33 ++++++---
 .../main/python/ambari_agent/AmbariConfig.py    |  6 +-
 .../ambari_agent/CustomServiceOrchestrator.py   | 11 ++-
 .../main/python/ambari_agent/PythonExecutor.py  | 11 ++-
 .../test/python/ambari_agent/TestActionQueue.py | 73 +++++++++++++++-----
 .../TestCustomServiceOrchestrator.py            | 24 ++++---
 .../test/python/ambari_agent/TestHeartbeat.py   | 10 +--
 9 files changed, 120 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/e4976458/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index ed9dab3..173bb51 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -29,6 +29,7 @@ ping_port=8670
 cache_dir=/var/lib/ambari-agent/cache
 tolerate_download_failures=true
 run_as_user=root
+parallel_execution=0
 
 [command]
 maxretries=2

http://git-wip-us.apache.org/repos/asf/ambari/blob/e4976458/ambari-agent/conf/windows/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/windows/ambari-agent.ini b/ambari-agent/conf/windows/ambari-agent.ini
index 377dbf4..61a3ad9 100644
--- a/ambari-agent/conf/windows/ambari-agent.ini
+++ b/ambari-agent/conf/windows/ambari-agent.ini
@@ -28,6 +28,7 @@ data_cleanup_max_size_MB = 100
 ping_port=8670
 cache_dir=cache
 tolerate_download_failures=true
+parallel_execution=0
 
 [command]
 maxretries=2

http://git-wip-us.apache.org/repos/asf/ambari/blob/e4976458/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 212226c..c7286bc 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -80,7 +80,9 @@ class ActionQueue(threading.Thread):
     self._stop = threading.Event()
     self.tmpdir = config.get('agent', 'prefix')
     self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
-
+    self.parallel_execution = config.get_parallel_exec_option()
+    if self.parallel_execution == 1:
+      logger.info("Parallel execution is enabled, will start Agent commands in parallel")
 
   def stop(self):
     self._stop.set()
@@ -145,10 +147,22 @@ class ActionQueue(threading.Thread):
       self.processBackgroundQueueSafeEmpty();
       self.processStatusCommandQueueSafeEmpty();
       try:
-        command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
-        self.process_command(command)
+        if self.parallel_execution == 0:
+          command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
+          self.process_command(command)
+        else:
+          # If parallel execution is enabled, just kick off all available
+          # commands using separate threads
+          while (True):
+            command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
+            logger.info("Kicking off a thread for the command, id=" +
+                        str(command['commandId']) + " taskId=" + str(command['taskId']))
+            t = threading.Thread(target=self.process_command, args=(command,))
+            t.daemon = True
+            t.start()
       except (Queue.Empty):
         pass
+
   def processBackgroundQueueSafeEmpty(self):
     while not self.backgroundCommandQueue.empty():
       try:
@@ -324,8 +338,9 @@ class ActionQueue(threading.Thread):
 
   def command_was_canceled(self):
     self.customServiceOrchestrator
-  def on_background_command_complete_callback(self, process_condenced_result, handle):
-    logger.debug('Start callback: %s' % process_condenced_result)
+
+  def on_background_command_complete_callback(self, process_condensed_result, handle):
+    logger.debug('Start callback: %s' % process_condensed_result)
     logger.debug('The handle is: %s' % handle)
     status = self.COMPLETED_STATUS if handle.exitCode == 0 else self.FAILED_STATUS
 
@@ -340,10 +355,10 @@ class ActionQueue(threading.Thread):
     roleResult = self.commandStatuses.generate_report_template(handle.command)
 
     roleResult.update({
-      'stdout': process_condenced_result['stdout'] + aborted_postfix,
-      'stderr': process_condenced_result['stderr'] + aborted_postfix,
-      'exitCode': process_condenced_result['exitcode'],
-      'structuredOut': str(json.dumps(process_condenced_result['structuredOut'])) if 'structuredOut'
in process_condenced_result else '',
+      'stdout': process_condensed_result['stdout'] + aborted_postfix,
+      'stderr': process_condensed_result['stderr'] + aborted_postfix,
+      'exitCode': process_condensed_result['exitcode'],
+      'structuredOut': str(json.dumps(process_condensed_result['structuredOut'])) if 'structuredOut'
in process_condensed_result else '',
       'status': status,
     })
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/e4976458/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index ffaaac7..13e9f03 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -43,6 +43,7 @@ data_cleanup_max_age=2592000
 data_cleanup_max_size_MB = 100
 ping_port=8670
 cache_dir={ps}var{ps}lib{ps}ambari-agent{ps}cache
+parallel_execution=0
 
 [services]
 
@@ -161,7 +162,7 @@ class AmbariConfig:
     try:
       return self.config.get(section, value)
     except ConfigParser.Error, err:
-      if default:
+      if default != None:
         return default
       raise err
 
@@ -245,6 +246,9 @@ class AmbariConfig:
     else:
       return False
 
+  def get_parallel_exec_option(self):
+    return int(self.get('agent', 'parallel_execution', 0))
+
 
 def updateConfigServerHostname(configFile, new_host):
   # update agent config file

http://git-wip-us.apache.org/repos/asf/ambari/blob/e4976458/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 54738a6..b107e3f 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -61,7 +61,6 @@ class CustomServiceOrchestrator():
     self.tmp_dir = config.get('agent', 'prefix')
     self.exec_tmp_dir = config.get('agent', 'tmp_dir')
     self.file_cache = FileCache(config)
-    self.python_executor = PythonExecutor(self.tmp_dir, config)
     self.status_commands_stdout = os.path.join(self.tmp_dir,
                                                'status_command_stdout.txt')
     self.status_commands_stderr = os.path.join(self.tmp_dir,
@@ -96,6 +95,13 @@ class CustomServiceOrchestrator():
       else: 
         logger.warn("Unable to find pid by taskId = %s" % task_id)
 
+  def get_py_executor(self):
+    """
+    Wrapper for unit testing
+    :return:
+    """
+    return PythonExecutor(self.tmp_dir, self.config)
+
   def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name=None,
                  override_output_files=True, retry=False):
     """
@@ -172,10 +178,11 @@ class CustomServiceOrchestrator():
       if command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND
and len(filtered_py_file_list) > 1:
         raise AgentException("Background commands are supported without hooks only")
 
+      python_executor = self.get_py_executor()
       for py_file, current_base_dir in filtered_py_file_list:
         log_info_on_failure = not command_name in self.DONT_DEBUG_FAILURES_FOR_COMMANDS
         script_params = [command_name, json_path, current_base_dir]
-        ret = self.python_executor.run_file(py_file, script_params,
+        ret = python_executor.run_file(py_file, script_params,
                                self.exec_tmp_dir, tmpoutfile, tmperrfile, timeout,
                                tmpstrucoutfile, logger_level, self.map_task_to_process,
                                task_id, override_output_files, handle = handle, log_info_on_failure=log_info_on_failure)

http://git-wip-us.apache.org/repos/asf/ambari/blob/e4976458/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index f215272..09be145 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -43,11 +43,11 @@ class PythonExecutor:
   used as a singleton for a concurrent execution of python scripts
   """
   NO_ERROR = "none"
-  grep = Grep()
-  event = threading.Event()
-  python_process_has_been_killed = False
 
   def __init__(self, tmpDir, config):
+    self.grep = Grep()
+    self.event = threading.Event()
+    self.python_process_has_been_killed = False
     self.tmpDir = tmpDir
     self.config = config
     pass
@@ -181,11 +181,10 @@ class PythonExecutor:
   def condenseOutput(self, stdout, stderr, retcode, structured_out):
     log_lines_count = self.config.get('heartbeat', 'log_lines_count')
     
-    grep = self.grep
     result = {
       "exitcode": retcode,
-      "stdout": grep.tail(stdout, log_lines_count) if log_lines_count else stdout,
-      "stderr": grep.tail(stderr, log_lines_count) if log_lines_count else stderr,
+      "stdout": self.grep.tail(stdout, log_lines_count) if log_lines_count else stdout,
+      "stderr": self.grep.tail(stderr, log_lines_count) if log_lines_count else stderr,
       "structuredOut" : structured_out
     }
     

http://git-wip-us.apache.org/repos/asf/ambari/blob/e4976458/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index f43d3f7..6aab74e 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -228,14 +228,17 @@ class TestActionQueue(TestCase):
   }
 
 
+  @patch.object(AmbariConfig, "get_parallel_exec_option")
   @patch.object(ActionQueue, "process_command")
   @patch.object(Queue, "get")
   @patch.object(CustomServiceOrchestrator, "__init__")
   def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock,
-                                get_mock, process_command_mock):
+                                get_mock, process_command_mock, get_parallel_exec_option_mock):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
     config = MagicMock()
+    get_parallel_exec_option_mock.return_value = 0
+    config.get_parallel_exec_option = get_parallel_exec_option_mock
     actionQueue = ActionQueue(config, dummy_controller)
     actionQueue.start()
     time.sleep(0.1)
@@ -481,7 +484,7 @@ class TestActionQueue(TestCase):
     }
     cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
 
-    config = AmbariConfig().getConfig()
+    config = AmbariConfig()
     tempdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tempdir)
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
@@ -523,7 +526,7 @@ class TestActionQueue(TestCase):
     }
     cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
 
-    config = AmbariConfig().getConfig()
+    config = AmbariConfig()
     tempdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tempdir)
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
@@ -564,7 +567,7 @@ class TestActionQueue(TestCase):
                                   status_update_callback):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
 
     build_mock.return_value = {'dummy report': '' }
 
@@ -600,7 +603,7 @@ class TestActionQueue(TestCase):
                                   status_update_callback):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
 
 
     requestComponentStatus_mock.reset_mock()
@@ -620,14 +623,17 @@ class TestActionQueue(TestCase):
     self.assertEqual(len(report['componentStatus']), 1)
     self.assertTrue(report['componentStatus'][0].has_key('alerts'))
 
+  @patch.object(AmbariConfig, "get_parallel_exec_option")
   @patch.object(ActionQueue, "process_command")
   @patch.object(Queue, "get")
   @patch.object(CustomServiceOrchestrator, "__init__")
   def test_reset_queue(self, CustomServiceOrchestrator_mock,
-                                get_mock, process_command_mock):
+                                get_mock, process_command_mock, gpeo_mock):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
     config = MagicMock()
+    gpeo_mock.return_value = 0
+    config.get_parallel_exec_option = gpeo_mock
     actionQueue = ActionQueue(config, dummy_controller)
     actionQueue.start()
     actionQueue.put([self.datanode_install_command, self.hbase_install_command])
@@ -639,14 +645,17 @@ class TestActionQueue(TestCase):
     actionQueue.join()
     self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
 
+  @patch.object(AmbariConfig, "get_parallel_exec_option")
   @patch.object(ActionQueue, "process_command")
   @patch.object(Queue, "get")
   @patch.object(CustomServiceOrchestrator, "__init__")
   def test_cancel(self, CustomServiceOrchestrator_mock,
-                       get_mock, process_command_mock):
+                       get_mock, process_command_mock, gpeo_mock):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
     config = MagicMock()
+    gpeo_mock.return_value = 0
+    config.get_parallel_exec_option = gpeo_mock
     actionQueue = ActionQueue(config, dummy_controller)
     actionQueue.start()
     actionQueue.put([self.datanode_install_command, self.hbase_install_command])
@@ -658,6 +667,27 @@ class TestActionQueue(TestCase):
     actionQueue.join()
     self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
 
+  @patch.object(AmbariConfig, "get_parallel_exec_option")
+  @patch.object(ActionQueue, "process_command")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_parallel_exec(self, CustomServiceOrchestrator_mock,
+                         process_command_mock, gpeo_mock):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    config = MagicMock()
+    gpeo_mock.return_value = 1
+    config.get_parallel_exec_option = gpeo_mock
+    actionQueue = ActionQueue(config, dummy_controller)
+    actionQueue.start()
+    actionQueue.put([self.datanode_install_command, self.hbase_install_command])
+    self.assertEqual(2, actionQueue.commandQueue.qsize())
+    time.sleep(1)
+    actionQueue.stop()
+    actionQueue.join()
+    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+    self.assertEqual(2, process_command_mock.call_count)
+    process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
+
 
   @patch("time.sleep")
   @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
@@ -668,7 +698,7 @@ class TestActionQueue(TestCase):
   ):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
     python_execution_result_dict = {
       'exitcode': 1,
       'stdout': 'out',
@@ -706,7 +736,7 @@ class TestActionQueue(TestCase):
   ):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
     execution_result_fail_dict = {
       'exitcode': 1,
       'stdout': 'out',
@@ -742,7 +772,7 @@ class TestActionQueue(TestCase):
   ):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
     execution_result_succ_dict = {
       'exitcode': 0,
       'stdout': 'out',
@@ -774,7 +804,7 @@ class TestActionQueue(TestCase):
                                                          'stderr' : 'err-13'}
     
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
 
     execute_command = copy.deepcopy(self.background_command)
     actionQueue.put([execute_command])
@@ -791,39 +821,44 @@ class TestActionQueue(TestCase):
     self.assertEqual(len(report['reports']),1)
 
   @not_for_platform(PLATFORM_WINDOWS)
+  @patch.object(CustomServiceOrchestrator, "get_py_executor")
   @patch.object(CustomServiceOrchestrator, "resolve_script_path")
   @patch.object(StackVersionsFileHandler, "read_stack_version")
-  def test_execute_python_executor(self, read_stack_version_mock, resolve_script_path_mock):
+  def test_execute_python_executor(self, read_stack_version_mock, resolve_script_path_mock,
+                                   get_py_executor_mock):
     
     dummy_controller = MagicMock()
-    cfg = AmbariConfig().getConfig()
+    cfg = AmbariConfig()
     cfg.set('agent', 'tolerate_download_failures', 'true')
     cfg.set('agent', 'prefix', '.')
     cfg.set('agent', 'cache_dir', 'background_tasks')
     
     actionQueue = ActionQueue(cfg, dummy_controller)
-    patch_output_file(actionQueue.customServiceOrchestrator.python_executor)
+    pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
+    patch_output_file(pyex)
+    get_py_executor_mock.return_value = pyex
     actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
    
     result = {}
     lock = threading.RLock()
     complete_done = threading.Condition(lock)
     
-    def command_complete_w(process_condenced_result, handle):
+    def command_complete_w(process_condensed_result, handle):
       with lock:
-        result['command_complete'] = {'condenced_result' : copy.copy(process_condenced_result),

+        result['command_complete'] = {'condensed_result' : copy.copy(process_condensed_result),
                                       'handle' : copy.copy(handle),
                                       'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])
                                       }
         complete_done.notifyAll()
-    
-    actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None,
command_complete_w)
+
+    actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,
+                                                                 None, command_complete_w)
     actionQueue.put([self.background_command])
     actionQueue.processBackgroundQueueSafeEmpty();
     actionQueue.processStatusCommandQueueSafeEmpty();
     
     with lock:
-      complete_done.wait(.1)
+      complete_done.wait(0.1)
       
       finished_status = result['command_complete']['command_status']
       self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)

http://git-wip-us.apache.org/repos/asf/ambari/blob/e4976458/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index a9e604d..2fb2ae5 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -389,19 +389,21 @@ class TestCustomServiceOrchestrator(TestCase):
   from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch.object(CustomServiceOrchestrator, "get_py_executor")
   @patch("ambari_commons.shell.kill_process_with_children")
   @patch.object(FileCache, "__init__")
   @patch.object(CustomServiceOrchestrator, "resolve_script_path")
   @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
   @patch.object(StackVersionsFileHandler, "read_stack_version")
-  def test_cancel_backgound_command(self, read_stack_version_mock, resolve_hook_script_path_mock,
resolve_script_path_mock, FileCache_mock,
-                                      kill_process_with_children_mock):
+  def test_cancel_backgound_command(self, read_stack_version_mock, resolve_hook_script_path_mock,
+                                    resolve_script_path_mock, FileCache_mock, kill_process_with_children_mock,
+                                    get_py_executor_mock):
     FileCache_mock.return_value = None
     FileCache_mock.cache_dir = MagicMock()
     resolve_hook_script_path_mock.return_value = None
 #     shell.kill_process_with_children = MagicMock()
     dummy_controller = MagicMock()
-    cfg = AmbariConfig().getConfig()
+    cfg = AmbariConfig()
     cfg.set('agent', 'tolerate_download_failures', 'true')
     cfg.set('agent', 'prefix', '.')
     cfg.set('agent', 'cache_dir', 'background_tasks')
@@ -419,8 +421,10 @@ class TestCustomServiceOrchestrator(TestCase):
     import TestActionQueue
     import copy
 
-    TestActionQueue.patch_output_file(orchestrator.python_executor)
-    orchestrator.python_executor.prepare_process_result = MagicMock()
+    pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
+    TestActionQueue.patch_output_file(pyex)
+    pyex.prepare_process_result = MagicMock()
+    get_py_executor_mock.return_value = pyex
     orchestrator.dump_command_to_json = MagicMock()
 
     lock = threading.RLock()
@@ -598,12 +602,14 @@ class TestCustomServiceOrchestrator(TestCase):
     self.assertEqual('UNKNOWN', status)
 
 
+  @patch.object(CustomServiceOrchestrator, "get_py_executor")
   @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
   @patch.object(FileCache, "__init__")
   @patch.object(FileCache, "get_custom_actions_base_dir")
   def test_runCommand_background_action(self, get_custom_actions_base_dir_mock,
                                     FileCache_mock,
-                                    dump_command_to_json_mock):
+                                    dump_command_to_json_mock,
+                                    get_py_executor_mock):
     FileCache_mock.return_value = None
     get_custom_actions_base_dir_mock.return_value = "some path"
     _, script = tempfile.mkstemp()
@@ -625,8 +631,10 @@ class TestCustomServiceOrchestrator(TestCase):
     orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
 
     import TestActionQueue
-    TestActionQueue.patch_output_file(orchestrator.python_executor)
-    orchestrator.python_executor.condenseOutput = MagicMock()
+    pyex = PythonExecutor(orchestrator.tmp_dir, orchestrator.config)
+    TestActionQueue.patch_output_file(pyex)
+    pyex.condenseOutput = MagicMock()
+    get_py_executor_mock.return_value = pyex
     orchestrator.dump_command_to_json = MagicMock()
 
     ret = orchestrator.runCommand(command, "out.txt", "err.txt")

http://git-wip-us.apache.org/repos/asf/ambari/blob/e4976458/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
index 2f13ef5..0c78fdb 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
@@ -55,7 +55,7 @@ class TestHeartbeat(TestCase):
 
 
   def test_build(self):
-    config = AmbariConfig.AmbariConfig().getConfig()
+    config = AmbariConfig.AmbariConfig()
     config.set('agent', 'prefix', 'tmp')
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
@@ -94,7 +94,7 @@ class TestHeartbeat(TestCase):
                    'exitCode': 777}],
       'componentStatus': [{'status': 'HEALTHY', 'componentName': 'NAMENODE'}]
     }
-    config = AmbariConfig.AmbariConfig().getConfig()
+    config = AmbariConfig.AmbariConfig()
     config.set('agent', 'prefix', 'tmp')
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
@@ -110,7 +110,7 @@ class TestHeartbeat(TestCase):
 
   @patch.object(ActionQueue, "result")
   def test_build_long_result(self, result_mock):
-    config = AmbariConfig.AmbariConfig().getConfig()
+    config = AmbariConfig.AmbariConfig()
     config.set('agent', 'prefix', 'tmp')
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
@@ -205,7 +205,7 @@ class TestHeartbeat(TestCase):
   @patch.object(Hardware, "_chk_mount", new = MagicMock(return_value=True))
   @patch.object(HostInfoLinux, 'register')
   def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock):
-    config = AmbariConfig.AmbariConfig().getConfig()
+    config = AmbariConfig.AmbariConfig()
     config.set('agent', 'prefix', 'tmp')
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
@@ -232,7 +232,7 @@ class TestHeartbeat(TestCase):
   @patch.object(Hardware, "_chk_mount", new = MagicMock(return_value=True))
   @patch.object(HostInfoLinux, 'register')
   def test_heartbeat_host_check_no_cmd(self, register_mock):
-    config = AmbariConfig.AmbariConfig().getConfig()
+    config = AmbariConfig.AmbariConfig()
     config.set('agent', 'prefix', 'tmp')
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")


Mime
View raw message