ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject git commit: AMBARI-5065 Rolling restart should also handle clients on the same machine as the restarting component (dsen)
Date Thu, 13 Mar 2014 18:04:41 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk 9e870b2cc -> 45de7eff7


AMBARI-5065 Rolling restart should also handle clients on the same machine as the restarting
component (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/45de7eff
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/45de7eff
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/45de7eff

Branch: refs/heads/trunk
Commit: 45de7eff7c4587c72a8982ed0942f08105db7c3a
Parents: 9e870b2
Author: Dmitry Sen <dsen@hortonworks.com>
Authored: Thu Mar 13 20:04:28 2014 +0200
Committer: Dmitry Sen <dsen@hortonworks.com>
Committed: Thu Mar 13 20:04:28 2014 +0200

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py |  8 +--
 .../python/ambari_agent/ActualConfigHandler.py  | 53 +++++++++-----
 .../src/main/python/ambari_agent/LiveStatus.py  |  8 ++-
 .../ambari_agent/TestActualConfigHandler.py     | 73 ++++++++++++++++++--
 .../test/python/ambari_agent/TestLiveStatus.py  | 15 ++--
 5 files changed, 119 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/45de7eff/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 549651a..b611027 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -28,7 +28,6 @@ import os
 from LiveStatus import LiveStatus
 from shell import shellRunner
 import PuppetExecutor
-import PythonExecutor
 from ActualConfigHandler import ActualConfigHandler
 from CommandStatusDict import CommandStatusDict
 from CustomServiceOrchestrator import CustomServiceOrchestrator
@@ -75,6 +74,7 @@ class ActionQueue(threading.Thread):
     self.config = config
     self.controller = controller
     self.sh = shellRunner()
+    self.configTags = {}
     self._stop = threading.Event()
     self.tmpdir = config.get('agent', 'prefix')
     self.customServiceOrchestrator = CustomServiceOrchestrator(config,
@@ -214,7 +214,7 @@ class ActionQueue(threading.Thread):
       roleResult['structuredOut'] = ''
     # let ambari know that configuration tags were applied
     if status == self.COMPLETED_STATUS:
-      configHandler = ActualConfigHandler(self.config)
+      configHandler = ActualConfigHandler(self.config, self.configTags)
       if command.has_key('configurationTags'):
         configHandler.write_actual(command['configurationTags'])
         roleResult['configurationTags'] = command['configurationTags']
@@ -226,7 +226,7 @@ class ActionQueue(threading.Thread):
         (command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND and \
         command['hostLevelParams'].has_key('custom_command') and \
         command['hostLevelParams']['custom_command'] == self.CUSTOM_COMMAND_RESTART)):
-        configHandler.copy_to_component(command['role'])
+        configHandler.write_actual_component(command['role'], command['configurationTags'])
         roleResult['configurationTags'] = configHandler.read_actual_component(command['role'])
     self.commandStatuses.put_command_status(command, roleResult)
 
@@ -248,7 +248,7 @@ class ActionQueue(threading.Thread):
       command_format = self.determine_command_format_version(command)
 
       livestatus = LiveStatus(cluster, service, component,
-                              globalConfig, self.config)
+                              globalConfig, self.config, self.configTags)
       component_status = None
       if command_format == self.COMMAND_FORMAT_V2:
         # For custom services, responsibility to determine service status is

http://git-wip-us.apache.org/repos/asf/ambari/blob/45de7eff/ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py b/ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py
index 024c575..3937f69 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py
@@ -21,15 +21,16 @@ limitations under the License.
 import json
 import logging
 import os
-import shutil
+import LiveStatus
 
 logger = logging.getLogger()
 
 class ActualConfigHandler:
   CONFIG_NAME = 'config.json'
 
-  def __init__(self, config):
-    self.config = config;
+  def __init__(self, config, configTags):
+    self.config = config
+    self.configTags = configTags
 
   def findRunDir(self):
     runDir = '/var/run/ambari-agent'
@@ -39,18 +40,35 @@ class ActualConfigHandler:
       runDir = '/tmp'
     return runDir
 
-  def write_actual(self, configTags):
-    runDir = self.findRunDir()
-    conf_file = open(os.path.join(runDir, self.CONFIG_NAME), 'w')
-    json.dump(configTags, conf_file)
-    conf_file.close()
+  def write_actual(self, tags):
+    self.write_file(self.CONFIG_NAME, tags)
+
+  def write_actual_component(self, component, tags):
+    self.configTags[component] = tags
+    filename = component + "_" + self.CONFIG_NAME
+    self.write_file(filename, tags)
+    self.write_client_components(tags)
 
-  def copy_to_component(self, componentName):
+  def write_client_components(self, tags):
+    for comp in LiveStatus.LiveStatus.CLIENT_COMPONENTS:
+      componentName = comp['componentName']
+      if componentName in self.configTags.keys():
+        tags_updated = False
+        for config_name in tags.keys():
+          if config_name in self.configTags[componentName] and \
+            tags[config_name] != self.configTags[componentName][config_name]:
+            self.configTags[componentName][config_name] = tags[config_name]
+            tags_updated = True
+        if tags_updated:
+          filename = componentName + "_" + self.CONFIG_NAME
+          self.write_file(filename, self.configTags[componentName])
+    pass
+
+  def write_file(self, filename, tags):
     runDir = self.findRunDir()
-    srcfile = os.path.join(runDir, self.CONFIG_NAME)
-    if os.path.isfile(srcfile):
-      dstfile = os.path.join(runDir, componentName + "_" + self.CONFIG_NAME)
-      shutil.copy(srcfile, dstfile)
+    conf_file = open(os.path.join(runDir, filename), 'w')
+    json.dump(tags, conf_file)
+    conf_file.close()
 
   def read_file(self, filename):
     runDir = self.findRunDir()
@@ -74,7 +92,8 @@ class ActualConfigHandler:
   def read_actual(self):
     return self.read_file(self.CONFIG_NAME)
 
-  def read_actual_component(self, componentName):
-    return self.read_file(componentName + "_" + self.CONFIG_NAME)
-    
-
+  def read_actual_component(self, component):
+    if component not in self.configTags.keys():
+      self.configTags[component] = \
+        self.read_file(component + "_" + self.CONFIG_NAME)
+    return self.configTags[component]

http://git-wip-us.apache.org/repos/asf/ambari/blob/45de7eff/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
index 6f9f4db..2a6959f 100644
--- a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
+++ b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py
@@ -151,14 +151,16 @@ class LiveStatus:
   LIVE_STATUS = "STARTED"
   DEAD_STATUS = "INSTALLED"
 
-  def __init__(self, cluster, service, component, globalConfig, config):
+  def __init__(self, cluster, service, component, globalConfig, config,
+               configTags):
     self.cluster = cluster
     self.service = service
     self.component = component
     self.globalConfig = globalConfig
     versionsFileDir = config.get('agent', 'prefix')
     self.versionsHandler = StackVersionsFileHandler(versionsFileDir)
-    self.actualConfigHandler = ActualConfigHandler(config)
+    self.configTags = configTags
+    self.actualConfigHandler = ActualConfigHandler(config, configTags)
 
   def belongsToService(self, component):
     #TODO: Should also check belonging of server to cluster
@@ -194,7 +196,7 @@ class LiveStatus:
                  "stackVersion": self.versionsHandler.
                  read_stack_version(self.component)
     }
-    active_config = self.actualConfigHandler.read_actual_component(self.component)
+    active_config = self.actualConfigHandler.read_actual_component(self.component) #
     if not active_config is None:
       livestatus['configurationTags'] = active_config
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/45de7eff/ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py b/ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py
index e62dd1a..a33bf32 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py
@@ -23,7 +23,7 @@ from ambari_agent.AmbariConfig import AmbariConfig
 from ambari_agent.ActualConfigHandler import ActualConfigHandler
 import os
 import logging
-import json
+from mock.mock import patch
 
 class TestActualConfigHandler(TestCase):
 
@@ -33,9 +33,9 @@ class TestActualConfigHandler(TestCase):
     config = AmbariConfig().getConfig()
     tmpdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tmpdir)
-    handler = ActualConfigHandler(config)
-    
+
     tags = { "global": "version1", "core-site": "version2" }
+    handler = ActualConfigHandler(config, tags)
     handler.write_actual(tags)
     output = handler.read_actual()
     self.assertEquals(tags, output)
@@ -45,7 +45,7 @@ class TestActualConfigHandler(TestCase):
     config = AmbariConfig().getConfig()
     tmpdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tmpdir)
-    handler = ActualConfigHandler(config)
+    handler = ActualConfigHandler(config, {})
 
     conf_file = open(os.path.join(tmpdir, ActualConfigHandler.CONFIG_NAME), 'w')
     conf_file.write("")
@@ -59,11 +59,11 @@ class TestActualConfigHandler(TestCase):
     config = AmbariConfig().getConfig()
     tmpdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tmpdir)
-    handler = ActualConfigHandler(config)
 
     tags1 = { "global": "version1", "core-site": "version2" }
+    handler = ActualConfigHandler(config, {})
     handler.write_actual(tags1)
-    handler.copy_to_component('FOO')
+    handler.write_actual_component('FOO', tags1)
 
     output1 = handler.read_actual_component('FOO')
     output2 = handler.read_actual_component('GOO') 
@@ -80,3 +80,64 @@ class TestActualConfigHandler(TestCase):
     self.assertEquals(tags1, output4)
     os.remove(os.path.join(tmpdir, "FOO_" + ActualConfigHandler.CONFIG_NAME))
     os.remove(os.path.join(tmpdir, ActualConfigHandler.CONFIG_NAME))
+
+  def test_write_actual_component(self):
+    config = AmbariConfig().getConfig()
+    tmpdir = tempfile.gettempdir()
+    config.set('agent', 'prefix', tmpdir)
+
+    tags1 = { "global": "version1", "core-site": "version2" }
+    tags2 = { "global": "version33", "core-site": "version33" }
+    handler = ActualConfigHandler(config, {})
+    handler.write_actual_component('HDFS_CLIENT', tags1)
+    handler.write_actual_component('HBASE_CLIENT', tags1)
+    self.assertEquals(tags1, handler.read_actual_component('HDFS_CLIENT'))
+    self.assertEquals(tags1, handler.read_actual_component('HBASE_CLIENT'))
+    handler.write_actual_component('DATANODE', tags2)
+    self.assertEquals(tags2, handler.read_actual_component('DATANODE'))
+    self.assertEquals(tags2, handler.read_actual_component('HDFS_CLIENT'))
+
+    os.remove(os.path.join(tmpdir, "DATANODE_" + ActualConfigHandler.CONFIG_NAME))
+    os.remove(os.path.join(tmpdir, "HBASE_CLIENT_" + ActualConfigHandler.CONFIG_NAME))
+    os.remove(os.path.join(tmpdir, "HDFS_CLIENT_" + ActualConfigHandler.CONFIG_NAME))
+
+  @patch.object(ActualConfigHandler, "write_file")
+  def test_write_client_components(self, write_file_mock):
+    config = AmbariConfig().getConfig()
+    tmpdir = tempfile.gettempdir()
+    config.set('agent', 'prefix', tmpdir)
+
+    tags0 = {"global": "version0", "core-site": "version0"}
+    tags1 = {"global": "version1", "core-site": "version2"}
+    tags2 = {"global": "version33", "core-site": "version33"}
+    configTags = {'HDFS_CLIENT': tags0, 'HBASE_CLIENT': tags1}
+    handler = ActualConfigHandler(config, configTags)
+    self.assertEquals(tags0, handler.read_actual_component('HDFS_CLIENT'))
+    self.assertEquals(tags1, handler.read_actual_component('HBASE_CLIENT'))
+    handler.write_client_components(tags2)
+    self.assertEquals(tags2, handler.read_actual_component('HDFS_CLIENT'))
+    self.assertEquals(tags2, handler.read_actual_component('HBASE_CLIENT'))
+    self.assertTrue(write_file_mock.called)
+    self.assertEqual(2, write_file_mock.call_count)
+
+  @patch.object(ActualConfigHandler, "write_file")
+  @patch.object(ActualConfigHandler, "read_file")
+  def test_read_actual_component_inmemory(self, read_file_mock, write_file_mock):
+    config = AmbariConfig().getConfig()
+    tmpdir = tempfile.gettempdir()
+    config.set('agent', 'prefix', tmpdir)
+
+    tags1 = { "global": "version1", "core-site": "version2" }
+    read_file_mock.return_value = tags1
+
+    handler = ActualConfigHandler(config, {})
+
+    handler.write_actual_component('NAMENODE', tags1)
+    self.assertTrue(write_file_mock.called)
+    self.assertEquals(tags1, handler.read_actual_component('NAMENODE'))
+    self.assertFalse(read_file_mock.called)
+    self.assertEquals(tags1, handler.read_actual_component('DATANODE'))
+    self.assertTrue(read_file_mock.called)
+    self.assertEquals(1, read_file_mock.call_count)
+    self.assertEquals(tags1, handler.read_actual_component('DATANODE'))
+    self.assertEquals(1, read_file_mock.call_count)

http://git-wip-us.apache.org/repos/asf/ambari/blob/45de7eff/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py b/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py
index 96a21b9..345bd44 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py
@@ -21,10 +21,9 @@ limitations under the License.
 from unittest import TestCase
 from ambari_agent.LiveStatus import LiveStatus
 from ambari_agent.AmbariConfig import AmbariConfig
-import socket
 import os, sys, StringIO
 from ambari_agent import ActualConfigHandler
-from mock.mock import patch, MagicMock, call
+from mock.mock import patch
 import pprint
 from ambari_agent import StatusCheck
 
@@ -46,7 +45,7 @@ class TestLiveStatus(TestCase):
     for component in LiveStatus.COMPONENTS:
       config = AmbariConfig().getConfig()
       config.set('agent', 'prefix', "ambari_agent" + os.sep + "dummy_files")
-      livestatus = LiveStatus('', component['serviceName'], component['componentName'], {},
config)
+      livestatus = LiveStatus('', component['serviceName'], component['componentName'], {},
config, {})
       livestatus.versionsHandler.versionsFilePath = "ambari_agent" + os.sep + "dummy_files"
+ os.sep + "dummy_current_stack"
       result = livestatus.build()
       print "LiveStatus of {0}: {1}".format(component['serviceName'], str(result))
@@ -57,23 +56,23 @@ class TestLiveStatus(TestCase):
 
     # Test build status for CLIENT component (in LiveStatus.CLIENT_COMPONENTS)
     read_actual_component_mock.return_value = "some tags"
-    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config)
+    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {})
     result = livestatus.build()
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result.has_key('configurationTags'))
     # Test build status with forsed_component_status
     ## Alive
-    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config)
+    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {})
     result = livestatus.build(forsed_component_status = LiveStatus.LIVE_STATUS)
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result['status'], LiveStatus.LIVE_STATUS)
     ## Dead
-    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config)
+    livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {})
     result = livestatus.build(forsed_component_status = LiveStatus.DEAD_STATUS)
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result['status'], LiveStatus.DEAD_STATUS)
 
-    livestatus = LiveStatus('c1', 'TEZ', 'TEZ_CLIENT', { }, config)
+    livestatus = LiveStatus('c1', 'TEZ', 'TEZ_CLIENT', { }, config, {})
     result = livestatus.build(forsed_component_status = LiveStatus.LIVE_STATUS)
     self.assertTrue(len(result) > 0, 'Livestatus should not be empty')
     self.assertTrue(result['status'], LiveStatus.LIVE_STATUS)
@@ -89,7 +88,7 @@ class TestLiveStatus(TestCase):
     config = AmbariConfig().getConfig()
     config.set('agent', 'prefix', "ambari_agent" + os.sep + "dummy_files")
     livestatus = LiveStatus('', 'SOME_UNKNOWN_SERVICE',
-                            'SOME_UNKNOWN_COMPONENT', {}, config)
+                            'SOME_UNKNOWN_COMPONENT', {}, config, {})
     livestatus.versionsHandler.versionsFilePath = "ambari_agent" + \
                       os.sep + "dummy_files" + os.sep + "dummy_current_stack"
     result = livestatus.build(forsed_component_status = "STARTED")


Mime
View raw message