ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject git commit: AMBARI-4006. Custom Action: Add agent support and support for Python based custom action scripts (dsen)
Date Fri, 06 Dec 2013 16:52:39 GMT
Updated Branches:
  refs/heads/trunk 758c0a2a0 -> 56f7f8744


AMBARI-4006. Custom Action: Add agent support and support for Python based custom action scripts
(dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/56f7f874
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/56f7f874
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/56f7f874

Branch: refs/heads/trunk
Commit: 56f7f8744fbcf0a7d1781e08b57538ba984ec119
Parents: 758c0a2
Author: Dmitry Sen <dsen@hortonworks.com>
Authored: Fri Dec 6 18:52:23 2013 +0200
Committer: Dmitry Sen <dsen@hortonworks.com>
Committed: Fri Dec 6 18:52:23 2013 +0200

----------------------------------------------------------------------
 ambari-agent/conf/unix/ambari-agent.ini         |  3 +
 ambari-agent/pom.xml                            | 12 ++++
 .../src/main/python/ambari_agent/ActionQueue.py |  5 ++
 .../main/python/ambari_agent/AmbariConfig.py    |  3 +
 .../python/ambari_agent/CommandStatusDict.py    |  5 ++
 .../ambari_agent/CustomServiceOrchestrator.py   | 24 +++++---
 .../main/python/ambari_agent/PythonExecutor.py  | 27 +++++++--
 .../libraries/script/script.py                  | 27 ++++++---
 .../test/python/ambari_agent/TestActionQueue.py | 10 +++-
 .../ambari_agent/TestCommandStatusDict.py       |  2 +
 .../TestCustomServiceOrchestrator.py            | 28 +++++++++
 .../python/ambari_agent/TestPythonExecutor.py   | 11 ++--
 .../python/resource_management/TestScript.py    | 19 ++++++
 .../server/actionmanager/ActionScheduler.java   | 39 ++++++------
 .../ambari/server/agent/HeartBeatHandler.java   |  2 +-
 .../controller/AmbariActionExecutionHelper.java | 17 ++++--
 .../resources/custom_actions/hdfs_rebalance.py  | 63 ++++++++++++++++++++
 17 files changed, 248 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index 6672dfa..e663a25 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -34,6 +34,9 @@ facter_home=/usr/lib/ambari-agent/lib/facter-1.6.10
 # How many seconds will pass before running puppet is terminated on timeout
 timeout_seconds = 600
 
+[python]
+custom_actions_dir = /var/lib/ambari-agent/resources
+
 [command]
 maxretries=2
 sleepBetweenRetries=1

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-agent/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-agent/pom.xml b/ambari-agent/pom.xml
index bbb00b1..b871e80 100644
--- a/ambari-agent/pom.xml
+++ b/ambari-agent/pom.xml
@@ -343,6 +343,18 @@
                 </source>
               </sources>
             </mapping>
+            <mapping>
+              <!-- custom actions root-->
+              <directory>/var/lib/ambari-agent/resources</directory>
+              <filemode>755</filemode>
+              <username>root</username>
+              <groupname>root</groupname>
+              <sources>
+                <source>
+                  <location>../ambari-server/src/main/resources/custom_actions</location>
+                </source>
+              </sources>
+            </mapping>
           </mappings>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 8944e51..5012325 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -143,6 +143,7 @@ class ActionQueue(threading.Thread):
     in_progress_status.update({
       'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
       'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
+      'structuredOut' : self.tmpdir + os.sep + 'structured-out-' + str(taskId) + '.json',
       'status': self.IN_PROGRESS_STATUS
     })
     self.commandStatuses.put_command_status(command, in_progress_status)
@@ -176,6 +177,10 @@ class ActionQueue(threading.Thread):
     if roleResult['stderr'] == '':
       roleResult['stderr'] = 'None'
 
+    if 'structuredOut' in commandresult:
+      roleResult['structuredOut'] = str(commandresult['structuredOut'])
+    else:
+      roleResult['structuredOut'] = ''
     # let ambari know that configuration tags were applied
     if status == self.COMPLETED_STATUS:
       configHandler = ActualConfigHandler(self.config)

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index aca7856..90455c2 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -44,6 +44,9 @@ puppet_home=/root/workspace/puppet-install/puppet-2.7.9
 facter_home=/root/workspace/puppet-install/facter-1.6.10
 timeout_seconds = 600
 
+[python]
+custom_actions_dir = /var/lib/ambari-agent/resources
+
 [command]
 maxretries=2
 sleepBetweenRetries=1

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index 8620acb..67f148f 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -18,6 +18,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
+import json
 import logging
 import threading
 from Grep import Grep
@@ -96,16 +97,20 @@ class CommandStatusDict():
     try:
       tmpout = open(report['tmpout'], 'r').read()
       tmperr = open(report['tmperr'], 'r').read()
+      with open(report['structuredOut'], 'r') as fp:
+        tmpstructuredout = json.load(fp)
     except Exception, err:
       logger.warn(err)
       tmpout = '...'
       tmperr = '...'
+      tmpstructuredout = ''
     grep = Grep()
     output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES)
     inprogress = self.generate_report_template(command)
     inprogress.update({
       'stdout': grep.filterMarkup(output),
       'stderr': tmperr,
+      'structuredOut': tmpstructuredout,
       'exitCode': 777,
       'status': ActionQueue.IN_PROGRESS_STATUS,
     })

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 9e7989c..b8de3db 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -39,6 +39,7 @@ class CustomServiceOrchestrator():
   """
 
   SCRIPT_TYPE_PYTHON = "PYTHON"
+  CUSTOM_ACTION_COMMAND = 'ACTIONEXECUTE'
 
   def __init__(self, config):
     self.config = config
@@ -50,21 +51,29 @@ class CustomServiceOrchestrator():
   def runCommand(self, command, tmpoutfile, tmperrfile):
     try:
       component_name = command['role']
-      stack_name = command['hostLevelParams']['stack_name']
-      stack_version = command['hostLevelParams']['stack_version']
       script_type = command['commandParams']['script_type']
       script = command['commandParams']['script']
       command_name = command['roleCommand']
       timeout = int(command['commandParams']['command_timeout'])
-      metadata_folder = command['commandParams']['service_metadata_folder']
-      base_dir = self.file_cache.get_service_base_dir(
+      task_id = command['taskId']
+      if command_name == self.CUSTOM_ACTION_COMMAND:
+        base_dir = self.config.get('python', 'custom_actions_dir')
+        script_path = os.path.join(base_dir, script)
+      else:
+        stack_name = command['hostLevelParams']['stack_name']
+        stack_version = command['hostLevelParams']['stack_version']
+        metadata_folder = command['commandParams']['service_metadata_folder']
+        base_dir = self.file_cache.get_service_base_dir(
           stack_name, stack_version, metadata_folder, component_name)
-      script_path = self.resolve_script_path(base_dir, script, script_type)
+        script_path = self.resolve_script_path(base_dir, script, script_type)
+
+      tmpstrucoutfile = os.path.join(self.tmp_dir, "structured-out-{0}.json".
+        format(task_id))
       if script_type.upper() == self.SCRIPT_TYPE_PYTHON:
         json_path = self.dump_command_to_json(command)
         script_params = [command_name, json_path, base_dir]
-        ret = self.python_executor.run_file(
-          script_path, script_params, tmpoutfile, tmperrfile, timeout)
+        ret = self.python_executor.run_file(script_path, script_params,
+          tmpoutfile, tmperrfile, timeout, tmpstrucoutfile)
       else:
         message = "Unknown script type {0}".format(script_type)
         raise AgentException(message)
@@ -76,6 +85,7 @@ class CustomServiceOrchestrator():
       ret = {
         'stdout' : message,
         'stderr' : message,
+        'structuredOut' : message,
         'exitcode': 1,
       }
     return ret

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index b6b1c68..ceacacf 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -17,7 +17,9 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 '''
+import json
 import logging
+import os
 import subprocess
 import pprint
 import threading
@@ -45,7 +47,8 @@ class PythonExecutor:
     self.config = config
     pass
 
-  def run_file(self, script, script_params, tmpoutfile, tmperrfile, timeout):
+  def run_file(self, script, script_params, tmpoutfile, tmperrfile, timeout,
+               tmpstructedoutfile):
     """
     Executes the specified python file in a separate subprocess.
     Method returns only when the subprocess is finished.
@@ -55,6 +58,7 @@ class PythonExecutor:
     """
     tmpout =  open(tmpoutfile, 'w')
     tmperr =  open(tmperrfile, 'w')
+    script_params += [tmpstructedoutfile]
     pythonCommand = self.python_command(script, script_params)
     logger.info("Running command " + pprint.pformat(pythonCommand))
     process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
@@ -72,10 +76,24 @@ class PythonExecutor:
     returncode = process.returncode
     out = open(tmpoutfile, 'r').read()
     error = open(tmperrfile, 'r').read()
+
+    try:
+      with open(tmpstructedoutfile, 'r') as fp:
+        structured_out = json.load(fp)
+    except Exception:
+      if os.path.exists(tmpstructedoutfile):
+        errMsg = 'Unable to read structured output from ' + tmpstructedoutfile
+        structured_out = {
+          'msg' : errMsg
+        }
+        logger.warn(structured_out)
+      else:
+        structured_out = ''
+
     if self.python_process_has_been_killed:
       error = str(error) + "\n Python script has been killed due to timeout"
       returncode = 999
-    result = self.condenseOutput(out, error, returncode)
+    result = self.condenseOutput(out, error, returncode, structured_out)
     logger.info("Result: %s" % result)
     return result
 
@@ -97,12 +115,13 @@ class PythonExecutor:
     python_command = [python_binary, script] + script_params
     return python_command
 
-  def condenseOutput(self, stdout, stderr, retcode):
+  def condenseOutput(self, stdout, stderr, retcode, structured_out):
     grep = self.grep
     result = {
       "exitcode": retcode,
       "stdout"  : grep.tail(stdout, grep.OUTPUT_LAST_LINES),
-      "stderr"  : grep.tail(stderr, grep.OUTPUT_LAST_LINES)
+      "stderr"  : grep.tail(stderr, grep.OUTPUT_LAST_LINES),
+      "structuredOut" : structured_out
     }
     return result
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-agent/src/main/python/resource_management/libraries/script/script.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/resource_management/libraries/script/script.py b/ambari-agent/src/main/python/resource_management/libraries/script/script.py
index 6368944..d75808e 100644
--- a/ambari-agent/src/main/python/resource_management/libraries/script/script.py
+++ b/ambari-agent/src/main/python/resource_management/libraries/script/script.py
@@ -36,6 +36,16 @@ class Script():
   even different Script instances can not be used from different threads at
   the same time
   """
+  structuredOut = {}
+
+  def put_structured_out(self, sout):
+    Script.structuredOut.update(sout)
+    try:
+      structuredOut = json.dumps(Script.structuredOut)
+      with open(self.stroutfile, 'w') as fp:
+        json.dump(structuredOut, fp)
+    except IOError:
+      Script.structuredOut.update({"errMsg" : "Unable to write to " + self.stroutfile})
 
   def execute(self):
     """
@@ -55,13 +65,14 @@ class Script():
     logger.addHandler(cherr)
     logger.addHandler(chout)
     # parse arguments
-    if len(sys.argv) < 1+3:
-      logger.error("Script expects at least 3 arguments")
+    if len(sys.argv) < 5:
+      logger.error("Script expects at least 4 arguments")
       sys.exit(1)
-    command_type = str.lower(sys.argv[1])
+    command_name = str.lower(sys.argv[1])
     # parse command parameters
     command_data_file = sys.argv[2]
     basedir = sys.argv[3]
+    self.stroutfile = sys.argv[4]
     try:
       with open(command_data_file, "r") as f:
         pass
@@ -71,17 +82,17 @@ class Script():
       sys.exit(1)
     # Run class method mentioned by a command type
     self_methods = dir(self)
-    if not command_type in self_methods:
-      logger.error("Script {0} has not method '{1}'".format(sys.argv[0], command_type))
+    if not command_name in self_methods:
+      logger.error("Script {0} has not method '{1}'".format(sys.argv[0], command_name))
       sys.exit(1)
-    method = getattr(self, command_type)
+    method = getattr(self, command_name)
     try:
       with Environment(basedir) as env:
         method(env)
     except Fail:
-      logger.exception("Got exception while executing method '{0}':".format(command_type))
+      logger.exception("Got exception while executing method '{0}':".format(command_name))
       sys.exit(1)
-      
+
   @staticmethod
   def get_config():
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 98817e9..a55b69f 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -195,10 +195,10 @@ class TestActionQueue(TestCase):
     self.assertTrue(print_exc_mock.called)
 
 
-
+  @patch("json.load")
   @patch("__builtin__.open")
   @patch.object(ActionQueue, "status_update_callback")
-  def test_execute_command(self, status_update_callback_mock, open_mock):
+  def test_execute_command(self, status_update_callback_mock, open_mock, json_load_mock):
     # Make file read calls visible
     def open_side_effect(file, mode):
       if mode == 'r':
@@ -208,6 +208,7 @@ class TestActionQueue(TestCase):
       else:
         return self.original_open(file, mode)
     open_mock.side_effect = open_side_effect
+    json_load_mock.return_value = ''
 
     config = AmbariConfig().getConfig()
     tempdir = tempfile.gettempdir()
@@ -217,6 +218,7 @@ class TestActionQueue(TestCase):
     puppet_execution_result_dict = {
       'stdout': 'out',
       'stderr': 'stderr',
+      'structuredOut' : ''
       }
     def side_effect(command, tmpoutfile, tmperrfile):
       unfreeze_flag.wait()
@@ -244,6 +246,7 @@ class TestActionQueue(TestCase):
     expected = {'status': 'IN_PROGRESS',
                 'stderr': 'Read from {0}/errors-3.txt'.format(tempdir),
                 'stdout': 'Read from {0}/output-3.txt'.format(tempdir),
+                'structuredOut' : '',
                 'clusterName': u'cc',
                 'roleCommand': u'INSTALL',
                 'serviceName': u'HDFS',
@@ -270,6 +273,7 @@ class TestActionQueue(TestCase):
                 'role': u'DATANODE',
                 'actionId': '1-1',
                 'taskId': 3,
+                'structuredOut' : '',
                 'exitCode': 0}
     self.assertEqual(len(report['reports']), 1)
     self.assertEqual(report['reports'][0], expected)
@@ -307,6 +311,7 @@ class TestActionQueue(TestCase):
                 'role': u'DATANODE',
                 'actionId': '1-1',
                 'taskId': 3,
+                'structuredOut' : '',
                 'exitCode': 13}
     self.assertEqual(len(report['reports']), 1)
     self.assertEqual(report['reports'][0], expected)
@@ -338,6 +343,7 @@ class TestActionQueue(TestCase):
                 'role': 'role',
                 'actionId': 17,
                 'taskId': 'taskId',
+                'structuredOut' : '',
                 'exitCode': 0}
     self.assertEqual(len(report['reports']), 1)
     self.assertEqual(report['reports'][0], expected)

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-agent/src/test/python/ambari_agent/TestCommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCommandStatusDict.py b/ambari-agent/src/test/python/ambari_agent/TestCommandStatusDict.py
index 07182ad..4d5dfda 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCommandStatusDict.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCommandStatusDict.py
@@ -107,12 +107,14 @@ class TestCommandStatusDict(TestCase):
                    {'status': 'COMPLETE', 'taskId': 4},
                    {'status': 'IN_PROGRESS', 'stderr': '...',
                     'stdout': '...', 'clusterName': u'cc',
+                    'structuredOut' : '',
                     'roleCommand': u'INSTALL', 'serviceName': u'HDFS',
                     'role': u'DATANODE', 'actionId': '1-1', 'taskId': 5,
                     'exitCode': 777},
                    {'status': 'IN_PROGRESS',
                     'stderr': '...',
                     'stdout': '...',
+                    'structuredOut' : '',
                     'clusterName': u'cc',
                     'roleCommand': u'INSTALL',
                     'serviceName': u'HDFS',

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index 8a6ef07..c9b167e 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -50,6 +50,8 @@ class TestCustomServiceOrchestrator(TestCase):
     self.config.add_section('agent')
     self.config.set('agent', 'prefix', tmpdir)
     self.config.set('agent', 'cache_dir', "/cachedir")
+    self.config.add_section('python')
+    self.config.set('python', 'custom_actions_dir', tmpdir)
 
 
   @patch("hostname.public_hostname")
@@ -116,6 +118,7 @@ class TestCustomServiceOrchestrator(TestCase):
         'command_timeout': '600',
         'service_metadata_folder' : 'HBASE'
       },
+      'taskId' : '3',
       'roleCommand': 'INSTALL'
     }
     get_service_base_dir_mock.return_value = "/basedir/"
@@ -140,6 +143,31 @@ class TestCustomServiceOrchestrator(TestCase):
     self.assertTrue("Unknown script type" in ret['stdout'])
     pass
 
+  @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
+  @patch.object(PythonExecutor, "run_file")
+  def test_runCommand(self, run_file_mock, dump_command_to_json_mock):
+    _, script = tempfile.mkstemp()
+    command = {
+      'role' : 'any',
+      'commandParams': {
+        'script_type': 'PYTHON',
+        'script': 'some_custom_action.py',
+        'command_timeout': '600',
+      },
+      'taskId' : '3',
+      'roleCommand': 'ACTIONEXECUTE'
+    }
+
+    orchestrator = CustomServiceOrchestrator(self.config)
+    # normal run case
+    run_file_mock.return_value = {
+      'stdout' : 'sss',
+      'stderr' : 'eee',
+      'exitcode': 0,
+      }
+    ret = orchestrator.runCommand(command, "out.txt", "err.txt")
+    self.assertEqual(ret['exitcode'], 0)
+    self.assertTrue(run_file_mock.called)
 
   def tearDown(self):
     # enable stdout

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
index 795105c..bff414e 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
@@ -42,6 +42,7 @@ class TestPythonExecutor(TestCase):
     executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
     _, tmpoutfile = tempfile.mkstemp()
     _, tmperrfile = tempfile.mkstemp()
+    _, tmpstrucout = tempfile.mkstemp()
     PYTHON_TIMEOUT_SECONDS = 0.1
     kill_process_with_children_mock.side_effect = lambda pid : subproc_mock.terminate()
 
@@ -54,8 +55,8 @@ class TestPythonExecutor(TestCase):
     runShellKillPgrp_method.side_effect = lambda python : python.terminate()
     executor.runShellKillPgrp = runShellKillPgrp_method
     subproc_mock.returncode = None
-    thread = Thread(target =  executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"],
-                                                    tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS))
+    thread = Thread(target =  executor.run_file, args = ("fake_puppetFile",
+      ["arg1", "arg2"], tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, tmpstrucout))
     thread.start()
     time.sleep(0.1)
     subproc_mock.finished_event.wait()
@@ -96,6 +97,7 @@ class TestPythonExecutor(TestCase):
     executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
     _, tmpoutfile = tempfile.mkstemp()
     _, tmperrfile = tempfile.mkstemp()
+    _, tmpstroutfile = tempfile.mkstemp()
     PYTHON_TIMEOUT_SECONDS =  5
 
     def launch_python_subprocess_method(command, tmpout, tmperr):
@@ -108,8 +110,9 @@ class TestPythonExecutor(TestCase):
     executor.runShellKillPgrp = runShellKillPgrp_method
     subproc_mock.returncode = 0
     subproc_mock.should_finish_event.set()
-    result = executor.run_file("file", ["arg1", "arg2"], tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS)
-    self.assertEquals(result, {'exitcode': 0, 'stderr': 'Dummy err', 'stdout': 'Dummy output'})
+    result = executor.run_file("file", ["arg1", "arg2"], tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS,
tmpstroutfile)
+    self.assertEquals(result, {'exitcode': 0, 'stderr': 'Dummy err', 'stdout': 'Dummy output',
+                               'structuredOut': {'msg': 'Unable to read structured output
from ' + tmpstroutfile}})
 
 
   def test_is_successfull(self):

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-agent/src/test/python/resource_management/TestScript.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/resource_management/TestScript.py b/ambari-agent/src/test/python/resource_management/TestScript.py
index 761c4cb..5eec7b5 100644
--- a/ambari-agent/src/test/python/resource_management/TestScript.py
+++ b/ambari-agent/src/test/python/resource_management/TestScript.py
@@ -82,6 +82,25 @@ class TestScript(TestCase):
     resource_dump = pprint.pformat(env.resource_list)
     self.assertEqual(resource_dump, "[Package['hbase'], Package['yet-another-package']]")
 
+  @patch("__builtin__.open")
+  def test_structured_out(self, open_mock):
+    script = Script()
+    script.stroutfile = ''
+
+    self.assertEqual(Script.structuredOut, {})
+
+    script.put_structured_out({"1": "1"})
+    self.assertEqual(Script.structuredOut, {"1": "1"})
+    self.assertTrue(open_mock.called)
+
+    script.put_structured_out({"2": "2"})
+    self.assertEqual(open_mock.call_count, 2)
+    self.assertEqual(Script.structuredOut, {"1": "1", "2": "2"})
+
+    #Overriding
+    script.put_structured_out({"1": "3"})
+    self.assertEqual(open_mock.call_count, 3)
+    self.assertEqual(Script.structuredOut, {"1": "3", "2": "2"})
 
   def tearDown(self):
     # enable stdout

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index 8faa06c..75bfd7b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.ServiceComponentNotFoundException;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
@@ -420,24 +421,26 @@ class ActionScheduler implements Runnable {
     String roleStr = cmd.getRole().toString();
     String hostname = cmd.getHostname();
     if (s.getStartTime(hostname, roleStr) < 0) {
-      try {
-        Cluster c = fsmObject.getCluster(s.getClusterName());
-        Service svc = c.getService(cmd.getServiceName());
-        ServiceComponent svcComp = svc.getServiceComponent(roleStr);
-        ServiceComponentHost svcCompHost =
-            svcComp.getServiceComponentHost(hostname);
-        svcCompHost.handleEvent(s.getFsmEvent(hostname, roleStr).getEvent());
-      } catch (ServiceComponentNotFoundException scnex) {
-        LOG.info("Not a service component, assuming its an action", scnex);
-      } catch (InvalidStateTransitionException e) {
-        LOG.info(
-            "Transition failed for host: " + hostname + ", role: "
-                + roleStr, e);
-        throw e;
-      } catch (AmbariException e) {
-        LOG.warn("Exception in fsm: " + hostname + ", role: " + roleStr,
-            e);
-        throw e;
+      if (RoleCommand.ACTIONEXECUTE != cmd.getRoleCommand()) {
+        try {
+          Cluster c = fsmObject.getCluster(s.getClusterName());
+          Service svc = c.getService(cmd.getServiceName());
+          ServiceComponent svcComp = svc.getServiceComponent(roleStr);
+          ServiceComponentHost svcCompHost =
+              svcComp.getServiceComponentHost(hostname);
+          svcCompHost.handleEvent(s.getFsmEvent(hostname, roleStr).getEvent());
+        } catch (ServiceComponentNotFoundException scnex) {
+          LOG.info("Not a service component, assuming its an action", scnex);
+        } catch (InvalidStateTransitionException e) {
+          LOG.info(
+              "Transition failed for host: " + hostname + ", role: "
+                  + roleStr, e);
+          throw e;
+        } catch (AmbariException e) {
+          LOG.warn("Exception in fsm: " + hostname + ", role: " + roleStr,
+              e);
+          throw e;
+        }
       }
       s.setStartTime(hostname,roleStr, now);
       s.setHostRoleStatus(hostname, roleStr, HostRoleStatus.QUEUED);

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 0866446..7501fb4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -193,7 +193,7 @@ public class HeartBeatHandler {
     List<CommandReport> reports = heartbeat.getReports();
     for (CommandReport report : reports) {
       LOG.debug("Received command report: " + report);
-      if (RoleCommand.ACTIONEXECUTE.equals(report.getRoleCommand())) {
+      if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand())) {
         continue;
       }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
index a008780..fb7ac0c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariActionExecutionHelper.java
@@ -41,10 +41,9 @@ import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
+import java.util.*;
+
+import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.*;
 
 /**
  * Helper class containing logic to process custom action execution requests
@@ -58,6 +57,8 @@ public class AmbariActionExecutionHelper {
   private ActionManager actionManager;
   private AmbariMetaInfo ambariMetaInfo;
 
+  private static final String TYPE_PYTHON = "PYTHON";
+
   public AmbariActionExecutionHelper(ActionMetadata actionMetadata, Clusters clusters,
                                      AmbariManagementControllerImpl amcImpl) {
     this.amcImpl = amcImpl;
@@ -272,6 +273,12 @@ public class AmbariActionExecutionHelper {
         configTags = amcImpl.findConfigurationTagsWithOverrides(cluster, hostName);
       }
 
+      Map<String, String> commandParams = actionContext.getParameters();
+      commandParams.put(COMMAND_TIMEOUT, actionContext.getTimeout().toString());
+      commandParams.put(SCRIPT, actionName + ".py");
+      commandParams.put(SCRIPT_TYPE, TYPE_PYTHON);
+      commandParams.put(SCHEMA_VERSION, AmbariMetaInfo.SCHEMA_VERSION_2);
+
       ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
           actionContext.getActionName()).getExecutionCommand();
 
@@ -282,7 +289,7 @@ public class AmbariActionExecutionHelper {
       execCmd.setConfigurations(configurations);
       execCmd.setConfigurationTags(configTags);
       execCmd.setHostLevelParams(hostLevelParams);
-      execCmd.setCommandParams(actionContext.getParameters());
+      execCmd.setCommandParams(commandParams);
       execCmd.setServiceName(serviceName);
       execCmd.setComponentName(componentName);
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/56f7f874/ambari-server/src/main/resources/custom_actions/hdfs_rebalance.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/custom_actions/hdfs_rebalance.py b/ambari-server/src/main/resources/custom_actions/hdfs_rebalance.py
new file mode 100644
index 0000000..f282a8c
--- /dev/null
+++ b/ambari-server/src/main/resources/custom_actions/hdfs_rebalance.py
@@ -0,0 +1,63 @@
+#!/usr/bin/env python2.6
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management import *
+import os
+import json
+
+
+class HdfsRebalance(Script):
+  def actionexecute(self, env):
+
+    config = Script.get_config()
+
+    hdfs_user = config['configurations']['global']['hdfs_user']
+    conf_dir = config['configurations']['global']['hadoop_conf_dir']
+
+    security_enabled = config['configurations']['global']['security_enabled']
+    kinit_path_local = functions.get_kinit_path(
+      [default('kinit_path_local'), "/usr/bin", "/usr/kerberos/bin", "/usr/sbin"])
+
+    threshold = config['commandParams']['threshold']
+    principal = config['commandParams']['principal']
+    keytab = config['commandParams']['keytab']
+
+    if security_enabled:
+      Execute(format("{kinit_path_local}  -kt {keytab} {principal}"))
+
+    ExecuteHadoop(format('balancer -threshold {threshold}'),
+      user=hdfs_user,
+      conf_dir=conf_dir
+    )
+
+    structured_output_example = {
+      'user' : hdfs_user,
+      'conf_dir' : conf_dir,
+      'principal' : principal,
+      'keytab' : keytab
+      }
+
+    self.put_structured_out(structured_output_example)
+
+if __name__ == "__main__":
+
+  HdfsRebalance().execute()


Mime
View raw message