Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1D278200C84 for ; Mon, 29 May 2017 10:29:27 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1B9B7160BCE; Mon, 29 May 2017 08:29:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 90E19160BC2 for ; Mon, 29 May 2017 10:29:25 +0200 (CEST) Received: (qmail 3752 invoked by uid 500); 29 May 2017 08:29:23 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 3743 invoked by uid 99); 29 May 2017 08:29:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 May 2017 08:29:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9CB8DDFB94; Mon, 29 May 2017 08:29:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aonishuk@apache.org To: commits@ambari.apache.org Message-Id: <24908c2b7f0f4a0b904073a5b7238033@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: AMBARI-21134. Get initial metadata, topology, configs from another endpoint (aonishuk) Date: Mon, 29 May 2017 08:29:23 +0000 (UTC) archived-at: Mon, 29 May 2017 08:29:27 -0000 Repository: ambari Updated Branches: refs/heads/branch-3.0-perf b094c753e -> 6bad191bb AMBARI-21134. Get initial metadata, topology, configs from another endpoint (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6bad191b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6bad191b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6bad191b Branch: refs/heads/branch-3.0-perf Commit: 6bad191bb3cf009cbe7d1d7a92942d47770ab31d Parents: b094c75 Author: Andrew Onishuk Authored: Mon May 29 11:28:44 2017 +0300 Committer: Andrew Onishuk Committed: Mon May 29 11:28:44 2017 +0300 ---------------------------------------------------------------------- .../main/python/ambari_agent/ClusterCache.py | 3 +- .../python/ambari_agent/CommandStatusDict.py | 3 +- .../ambari_agent/CommandStatusReporter.py | 7 +- .../ambari_agent/ComponentStatusExecutor.py | 7 +- .../src/main/python/ambari_agent/Constants.py | 10 +- .../main/python/ambari_agent/HeartbeatThread.py | 48 ++++---- .../python/ambari_agent/InitializerModule.py | 2 + .../src/main/python/ambari_agent/Utils.py | 3 + .../listeners/ServerResponsesListener.py | 23 +++- .../python/ambari_agent/listeners/__init__.py | 13 ++- .../src/main/python/ambari_agent/security.py | 8 +- .../ambari_agent/BaseStompServerTestCase.py | 35 +++++- .../ambari_agent/TestAgentStompResponses.py | 115 ++++++++----------- 13 files changed, 169 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py index 70b44b4..4b88f71 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py @@ -97,11 +97,10 @@ class ClusterCache(dict): with os.fdopen(os.open(self.__current_cache_json_file, os.O_WRONLY | os.O_CREAT, 0o600), "w") as f: json.dump(self, f, indent=2) - def get_md5_hashsum(self, cluster_id): + def get_md5_hashsum(self): """ Thread-safe method for writing out the specified cluster cache and updating the in-memory representation. - :param cluster_id: :param cache: :return: """ http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py index bb0cea3..afaf77d 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py @@ -21,7 +21,6 @@ limitations under the License. import logging import threading import copy -import json from Grep import Grep from ambari_agent import Constants @@ -57,7 +56,7 @@ class CommandStatusDict(): self.force_update_to_server([new_report]) def force_update_to_server(self, reports): - self.initializer_module.connection.send(body=json.dumps(reports), destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT) + self.initializer_module.connection.send(message=reports, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT) def get_command_status(self, taskId): with self.lock: http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py index acee3b1..216f20b 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py @@ -18,7 +18,6 @@ See the License for the specific language governing permissions and limitations under the License. ''' -import json import logging import threading @@ -43,10 +42,10 @@ class CommandStatusReporter(threading.Thread): while not self.stop_event.is_set(): try: - # TODO STOMP: what if not registered? + # TODO STOMP: if not registered, reports should not be on agent until next registration report = self.commandStatuses.generate_report() - if report: - self.initializer_module.connection.send(body=json.dumps(report), destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT) + if report and self.initializer_module.is_registered: + self.initializer_module.connection.send(message=report, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT) self.stop_event.wait(self.command_reports_interval) except: logger.exception("Exception in CommandStatusReporter. Re-running it") http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py index 1f6a7dc..6783138 100644 --- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py @@ -18,7 +18,6 @@ See the License for the specific language governing permissions and limitations under the License. ''' -import json import random import logging import threading @@ -98,8 +97,10 @@ class ComponentStatusExecutor(threading.Thread): def send_updates_to_server(self, cluster_reports): # TODO STOMP: override send to send dicts and lists? and not use json.dump - # TODO STOMP: skip this if server is down? - self.initializer_module.connection.send(body=json.dumps(cluster_reports), destination=Constants.COMPONENT_STATUS_REPORTS_ENDPOINT) + if not cluster_reports or not self.initializer_module.is_registered: + return + + self.initializer_module.connection.send(message=cluster_reports, destination=Constants.COMPONENT_STATUS_REPORTS_ENDPOINT) for cluster_id, reports in cluster_reports.iteritems(): for report in reports: http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/Constants.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py index 6a054cc..e15d1d8 100644 --- a/ambari-agent/src/main/python/ambari_agent/Constants.py +++ b/ambari-agent/src/main/python/ambari_agent/Constants.py @@ -21,12 +21,16 @@ limitations under the License. COMMANDS_TOPIC = '/user/commands' CONFIGURATIONS_TOPIC = '/user/configs' -METADATA_TOPIC = '/user/metadata' -TOPOLOGIES_TOPIC = '/user/topologies' +METADATA_TOPIC = '/events/metadata' +TOPOLOGIES_TOPIC = '/events/topology' SERVER_RESPONSES_TOPIC = '/user/' -TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC, COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC] +PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC] +POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC] +TOPOLOGY_REQUEST_ENDPOINT = '/agents/topology' +METADATA_REQUEST_ENDPOINT = '/agents/metadata' +CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs' COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status' COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status' http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py index 70fe7e7..e88fee7 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py @@ -18,11 +18,9 @@ See the License for the specific language governing permissions and limitations under the License. ''' -import json import logging import ambari_stomp import threading -from collections import defaultdict from ambari_agent import Constants from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener @@ -41,7 +39,6 @@ class HeartbeatThread(threading.Thread): """ def __init__(self, initializer_module): threading.Thread.__init__(self) - self.is_registered = False self.heartbeat_interval = HEARTBEAT_INTERVAL self.stop_event = initializer_module.stop_event @@ -55,6 +52,12 @@ class HeartbeatThread(threading.Thread): self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache) self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache) self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener] + self.post_registration_requests = [ + (Constants.TOPOLOGY_REQUEST_ENDPOINT, initializer_module.topology_cache, self.topology_events_listener), + (Constants.METADATA_REQUEST_ENDPOINT, initializer_module.metadata_cache, self.metadata_events_listener), + (Constants.CONFIGURATIONS_REQUEST_ENDPOINT, initializer_module.configurations_cache, self.configuration_events_listener) + ] + def run(self): """ @@ -63,7 +66,7 @@ class HeartbeatThread(threading.Thread): # TODO STOMP: stop the thread on SIGTERM while not self.stop_event.is_set(): try: - if not self.is_registered: + if not self.initializer_module.is_registered: self.register() heartbeat_body = self.get_heartbeat_body() @@ -75,16 +78,19 @@ class HeartbeatThread(threading.Thread): # TODO STOMP: handle heartbeat reponse except: logger.exception("Exception in HeartbeatThread. Re-running the registration") - # TODO STOMP: re-connect here - self.is_registered = False + self.initializer_module.is_registered = False + self.initializer_module.connection.disconnect() pass + + self.initializer_module.connection.disconnect() logger.info("HeartbeatThread has successfully finished") def register(self): """ Subscribe to topics, register with server, wait for server's response. """ - self.subscribe_and_listen() + self.add_listeners() + self.subscribe_to_topics(Constants.PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE) registration_request = self.get_registration_request() logger.info("Sending registration request") @@ -96,20 +102,19 @@ class HeartbeatThread(threading.Thread): logger.debug("Registration response is {0}".format(response)) self.registration_response = response - self.registered = True + + for endpoint, cache, listener in self.post_registration_requests: + response = self.blocking_request({'hash': cache.get_md5_hashsum()}, endpoint) + listener.on_event({}, response) + + self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE) + self.initializer_module.is_registered = True def get_registration_request(self): """ Get registration request body to send it to server """ - request = {'clusters':defaultdict(lambda:{})} - - for cache in self.caches: - cache_key_name = cache.get_cache_name() + '_hash' - for cluster_id in cache.get_cluster_ids(): - request['clusters'][cluster_id][cache_key_name] = cache.get_md5_hashsum(cluster_id) - - return request + return {'registration-response':'true'} def get_heartbeat_body(self): """ @@ -117,19 +122,20 @@ class HeartbeatThread(threading.Thread): """ return {'hostname':'true'} - def subscribe_and_listen(self): + def add_listeners(self): """ Subscribe to topics and set listener classes. """ for listener in self.listeners: self.initializer_module.connection.add_listener(listener) - for topic_name in Constants.TOPICS_TO_SUBSCRIBE: + def subscribe_to_topics(self, topics_list): + for topic_name in topics_list: self.initializer_module.connection.subscribe(destination=topic_name, id='sub', ack='client-individual') - def blocking_request(self, body, destination): + def blocking_request(self, message, destination): """ Send a request to server and waits for the response from it. The response it detected by the correspondence of correlation_id. """ - self.initializer_module.connection.send(body=json.dumps(body), destination=destination) - return self.server_responses_listener.responses.blocking_pop(str(self.initializer_module.connection.correlation_id)) \ No newline at end of file + correlation_id = self.initializer_module.connection.send(message=message, destination=destination) + return self.server_responses_listener.responses.blocking_pop(str(correlation_id)) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py index c36bd68..4d0ac9b 100644 --- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -63,6 +63,8 @@ class InitializerModule: """ self.stop_event = threading.Event() + self.is_registered = False + self.metadata_cache = ClusterMetadataCache(self.cluster_cache_dir) self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir) self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir) http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/Utils.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py index d6f0294..e48aa5f 100644 --- a/ambari-agent/src/main/python/ambari_agent/Utils.py +++ b/ambari-agent/src/main/python/ambari_agent/Utils.py @@ -43,6 +43,9 @@ class BlockingDictionary(): """ Block until a key in dictionary is available and than pop it. """ + if key in self.dict: + return self.dict.pop(key) + while True: self.put_event.wait() self.put_event.clear() http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py index 8502507..6d23c37 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py @@ -33,6 +33,7 @@ class ServerResponsesListener(EventListener): """ def __init__(self): self.responses = Utils.BlockingDictionary() + self.listener_functions = {} def on_event(self, headers, message): """ @@ -44,9 +45,25 @@ class ServerResponsesListener(EventListener): @param message: message payload dictionary """ if Constants.CORRELATION_ID_STRING in headers: - self.responses.put(headers[Constants.CORRELATION_ID_STRING], message) + correlation_id = headers[Constants.CORRELATION_ID_STRING] + self.responses.put(correlation_id, message) + + if correlation_id in self.listener_functions: + self.listener_functions[correlation_id](headers, message) + del self.listener_functions[correlation_id] else: - logger.warn("Received a message from server without a '{0}' header. Ignoring the message".format(Constants.CORRELATION_ID_STRING))\ + logger.warn("Received a message from server without a '{0}' header. Ignoring the message".format(Constants.CORRELATION_ID_STRING)) def get_handled_path(self): - return Constants.SERVER_RESPONSES_TOPIC \ No newline at end of file + return Constants.SERVER_RESPONSES_TOPIC + + def get_log_message(self, headers, message_json): + """ + This string will be used to log received messsage of this type + """ + if Constants.CORRELATION_ID_STRING in headers: + correlation_id = headers[Constants.CORRELATION_ID_STRING] + return " (correlation_id={0}): {1}".format(correlation_id, message_json) + return str(message_json) + + http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py index 45b38ed..f05f8da 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py @@ -46,12 +46,11 @@ class EventListener(ambari_stomp.ConnectionListener): logger.exception("Received event from server does not a valid json as a message. Message is:\n{0}".format(message)) return - logger.info("Received event from {0}".format(destination)) - logger.debug("Received event from {0}: headers={1} ; message={2}".format(destination, headers, message)) + logger.info("Event from server at {0}{1}".format(destination, self.get_log_message(headers, message_json))) try: self.on_event(headers, message_json) except: - logger.exception("Exception while handing event from {0}: headers={1} ; message={2}".format(destination, headers, message)) + logger.exception("Exception while handing event from {0} {1}".format(destination, headers, message)) def on_event(self, headers, message): """ @@ -60,4 +59,10 @@ class EventListener(ambari_stomp.ConnectionListener): @param headers: headers dictionary @param message: message payload dictionary """ - raise NotImplementedError() \ No newline at end of file + raise NotImplementedError() + + def get_log_message(self, headers, message_json): + """ + This string will be used to log received messsage of this type + """ + return ": " + str(message_json) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/main/python/ambari_agent/security.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py index b3cb16e..df45699 100644 --- a/ambari-agent/src/main/python/ambari_agent/security.py +++ b/ambari-agent/src/main/python/ambari_agent/security.py @@ -106,10 +106,16 @@ class AmbariStompConnection(WsConnection): self.correlation_id = -1 WsConnection.__init__(self, url) - def send(self, destination, body, content_type=None, headers=None, **keyword_headers): + def send(self, destination, message, content_type=None, headers=None, **keyword_headers): self.correlation_id += 1 + + logger.info("Event to server at {0} (correlation_id={1}): {2}".format(destination, self.correlation_id, message)) + + body = json.dumps(message) WsConnection.send(self, destination, body, content_type=content_type, headers=headers, correlationId=self.correlation_id, **keyword_headers) + return self.correlation_id + def add_listener(self, listener): self.set_listener(listener.__class__.__name__, listener) http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py index 671387a..7380727 100644 --- a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py +++ b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py @@ -18,6 +18,7 @@ See the License for the specific language governing permissions and limitations under the License. ''' +import json import ambari_stomp import os import sys @@ -42,6 +43,8 @@ from coilmq.store.memory import MemoryQueue from coilmq.scheduler import FavorReliableSubscriberScheduler, RandomQueueScheduler from coilmq.protocol import STOMP10 +logger = logging.getLogger(__name__) + class BaseStompServerTestCase(unittest.TestCase): """ Base class for test cases provides the fixtures for setting up the multi-threaded @@ -58,6 +61,7 @@ class BaseStompServerTestCase(unittest.TestCase): self.ready_event = threading.Event() addr_bound = threading.Event() + self.init_stdout_logger() def start_server(): self.server = TestStompServer(('127.0.0.1', 21613), @@ -123,6 +127,30 @@ class BaseStompServerTestCase(unittest.TestCase): with open(filepath) as f: return f.read() + def init_stdout_logger(self): + format='%(levelname)s %(asctime)s - %(message)s' + + logger = logging.getLogger() + logger.setLevel(logging.INFO) + formatter = logging.Formatter(format) + chout = logging.StreamHandler(sys.stdout) + chout.setLevel(logging.INFO) + chout.setFormatter(formatter) + cherr = logging.StreamHandler(sys.stderr) + cherr.setLevel(logging.ERROR) + cherr.setFormatter(formatter) + logger.handlers = [] + logger.addHandler(cherr) + logger.addHandler(chout) + + logging.getLogger('stomp.py').setLevel(logging.WARN) + logging.getLogger('coilmq').setLevel(logging.INFO) + + def remove_files(self, filepathes): + for filepath in filepathes: + if os.path.isfile(filepath): + os.remove(filepath) + class TestStompServer(ThreadedStompServer): """ @@ -235,9 +263,14 @@ class TestCaseTcpConnection(ambari_stomp.Connection): self.correlation_id = -1 ambari_stomp.Connection.__init__(self, host_and_ports=[('127.0.0.1', 21613)]) - def send(self, destination, body, content_type=None, headers=None, **keyword_headers): + def send(self, destination, message, content_type=None, headers=None, **keyword_headers): self.correlation_id += 1 + + logger.info("Event to server at {0} (correlation_id={1}): {2}".format(destination, self.correlation_id, message)) + + body = json.dumps(message) ambari_stomp.Connection.send(self, destination, body, content_type=content_type, headers=headers, correlationId=self.correlation_id, **keyword_headers) + return self.correlation_id def add_listener(self, listener): self.set_listener(listener.__class__.__name__, listener) http://git-wip-us.apache.org/repos/asf/ambari/blob/6bad191b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py index 9d59222..1f3a6e7 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py @@ -21,6 +21,7 @@ import os import sys import logging import json +import time from coilmq.util import frames from coilmq.util.frames import Frame @@ -38,9 +39,8 @@ class TestAgentStompResponses(BaseStompServerTestCase): @patch.object(CustomServiceOrchestrator, "runCommand") def test_mock_server_can_start(self, runCommand_mock): runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 'structuredOut' : '{}', 'exitcode':1} - self.init_stdout_logger() - self.remove(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json']) + self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json']) if not os.path.exists("/tmp/ambari-agent"): os.mkdir("/tmp/ambari-agent") @@ -52,48 +52,55 @@ class TestAgentStompResponses(BaseStompServerTestCase): action_queue = initializer_module.action_queue action_queue.start() - connect_frame = self.server.frames_queue.get() - users_subscribe_frame = self.server.frames_queue.get() - commands_subscribe_frame = self.server.frames_queue.get() - configurations_subscribe_frame = self.server.frames_queue.get() - metadata_subscribe_frame = self.server.frames_queue.get() - topologies_subscribe_frame = self.server.frames_queue.get() - registration_frame = self.server.frames_queue.get() - component_status_executor = ComponentStatusExecutor(initializer_module) component_status_executor.start() command_status_reporter = CommandStatusReporter(initializer_module) command_status_reporter.start() - status_reports_frame = self.server.frames_queue.get() + connect_frame = self.server.frames_queue.get() + users_subscribe_frame = self.server.frames_queue.get() + registration_frame = self.server.frames_queue.get() # server sends registration response f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '0'}, body=self.get_json("registration_response.json")) self.server.topic_manager.send(f) - f = Frame(frames.MESSAGE, headers={'destination': '/user/configs'}, body=self.get_json("configurations_update.json")) + + # response to /initial_topology + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '1'}, body=self.get_json("topology_update.json")) self.server.topic_manager.send(f) - f = Frame(frames.MESSAGE, headers={'destination': '/user/commands'}, body=self.get_json("execution_commands.json")) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body=self.get_json("metadata_after_registration.json")) self.server.topic_manager.send(f) - f = Frame(frames.MESSAGE, headers={'destination': '/user/metadata'}, body=self.get_json("metadata_after_registration.json")) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body=self.get_json("configurations_update.json")) self.server.topic_manager.send(f) - f = Frame(frames.MESSAGE, headers={'destination': '/user/topologies'}, body=self.get_json("topology_update.json")) + initial_topology_request = self.server.frames_queue.get() + initial_metadata_request = self.server.frames_queue.get() + initial_configs_request = self.server.frames_queue.get() + + while not initializer_module.is_registered: + time.sleep(0.1) + + f = Frame(frames.MESSAGE, headers={'destination': '/user/commands'}, body=self.get_json("execution_commands.json")) self.server.topic_manager.send(f) + commands_subscribe_frame = self.server.frames_queue.get() + configurations_subscribe_frame = self.server.frames_queue.get() + metadata_subscribe_frame = self.server.frames_queue.get() + topologies_subscribe_frame = self.server.frames_queue.get() heartbeat_frame = self.server.frames_queue.get() - dn_status_in_progress_frame = json.loads(self.server.frames_queue.get().body) - dn_status_failed_frame = json.loads(self.server.frames_queue.get().body) - zk_status_in_progress_frame = json.loads(self.server.frames_queue.get().body) - zk_status_failed_frame = json.loads(self.server.frames_queue.get().body) + dn_start_in_progress_frame = json.loads(self.server.frames_queue.get().body) + dn_start_failed_frame = json.loads(self.server.frames_queue.get().body) + zk_start_in_progress_frame = json.loads(self.server.frames_queue.get().body) + zk_start_failed_frame = json.loads(self.server.frames_queue.get().body) action_status_in_progress_frame = json.loads(self.server.frames_queue.get().body) action_status_failed_frame = json.loads(self.server.frames_queue.get().body) initializer_module.stop_event.set() - f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body=json.dumps({'heartbeat-response':'true'})) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'heartbeat-response':'true'})) self.server.topic_manager.send(f) heartbeat_thread.join() @@ -104,10 +111,10 @@ class TestAgentStompResponses(BaseStompServerTestCase): self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostname'], 'c6401.ambari.apache.org') self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'], ('STATUS',)) self.assertEquals(initializer_module.configurations_cache['0']['configurations']['zoo.cfg']['clientPort'], '2181') - self.assertEquals(dn_status_in_progress_frame[0]['roleCommand'], 'START') - self.assertEquals(dn_status_in_progress_frame[0]['role'], 'DATANODE') - self.assertEquals(dn_status_in_progress_frame[0]['status'], 'IN_PROGRESS') - self.assertEquals(dn_status_failed_frame[0]['status'], 'FAILED') + self.assertEquals(dn_start_in_progress_frame[0]['roleCommand'], 'START') + self.assertEquals(dn_start_in_progress_frame[0]['role'], 'DATANODE') + self.assertEquals(dn_start_in_progress_frame[0]['status'], 'IN_PROGRESS') + self.assertEquals(dn_start_failed_frame[0]['status'], 'FAILED') """ ============================================================================================ @@ -123,62 +130,42 @@ class TestAgentStompResponses(BaseStompServerTestCase): action_queue = initializer_module.action_queue action_queue.start() - connect_frame = self.server.frames_queue.get() - users_subscribe_frame = self.server.frames_queue.get() - commands_subscribe_frame = self.server.frames_queue.get() - configurations_subscribe_frame = self.server.frames_queue.get() - metadata_subscribe_frame = self.server.frames_queue.get() - topologies_subscribe_frame = self.server.frames_queue.get() - registration_frame_json = json.loads(self.server.frames_queue.get().body) - clusters_hashes = registration_frame_json['clusters']['0'] - component_status_executor = ComponentStatusExecutor(initializer_module) component_status_executor.start() command_status_reporter = CommandStatusReporter(initializer_module) command_status_reporter.start() - status_reports_frame = self.server.frames_queue.get() - - self.assertEquals(clusters_hashes['metadata_hash'], '21724f6ffa7aff0fe91a0c0c5b765dba') - self.assertEquals(clusters_hashes['configurations_hash'], '04c968412ded7c8ffe7858036bae03ce') - self.assertEquals(clusters_hashes['topology_hash'], '0de1df56fd594873fe594cf02ea61f4b') + connect_frame = self.server.frames_queue.get() + users_subscribe_frame = self.server.frames_queue.get() + registration_frame = self.server.frames_queue.get() # server sends registration response f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '0'}, body=self.get_json("registration_response.json")) self.server.topic_manager.send(f) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '1'}, body='{}') + self.server.topic_manager.send(f) + + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body='{}') + self.server.topic_manager.send(f) + + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}') + self.server.topic_manager.send(f) + + commands_subscribe_frame = self.server.frames_queue.get() + configurations_subscribe_frame = self.server.frames_queue.get() + metadata_subscribe_frame = self.server.frames_queue.get() + topologies_subscribe_frame = self.server.frames_queue.get() heartbeat_frame = self.server.frames_queue.get() + status_reports_frame = self.server.frames_queue.get() + initializer_module.stop_event.set() - f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body=json.dumps({'heartbeat-response':'true'})) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'heartbeat-response':'true'})) self.server.topic_manager.send(f) heartbeat_thread.join() component_status_executor.join() command_status_reporter.join() - action_queue.join() - - def remove(self, filepathes): - for filepath in filepathes: - if os.path.isfile(filepath): - os.remove(filepath) - - def init_stdout_logger(self): - format='%(levelname)s %(asctime)s - %(message)s' - - logger = logging.getLogger() - logger.setLevel(logging.INFO) - formatter = logging.Formatter(format) - chout = logging.StreamHandler(sys.stdout) - chout.setLevel(logging.INFO) - chout.setFormatter(formatter) - cherr = logging.StreamHandler(sys.stderr) - cherr.setLevel(logging.ERROR) - cherr.setFormatter(formatter) - logger.handlers = [] - logger.addHandler(cherr) - logger.addHandler(chout) - - logging.getLogger('stomp.py').setLevel(logging.WARN) - logging.getLogger('coilmq').setLevel(logging.INFO) \ No newline at end of file + action_queue.join() \ No newline at end of file