ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aonis...@apache.org
Subject [22/22] ambari git commit: AMBARI-20721. Add stab registering and heartbeating routine via stomp async calls. (aonishuk)
Date Tue, 11 Apr 2017 11:14:44 GMT
AMBARI-20721. Add stab registering and heartbeating routine via stomp async calls. (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2f20b2ed
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2f20b2ed
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2f20b2ed

Branch: refs/heads/branch-3.0-perf
Commit: 2f20b2ed2e6663037b3301f52c2a92cd98d526fe
Parents: 73eff12
Author: Andrew Onishuk <aonishuk@hortonworks.com>
Authored: Tue Apr 11 14:14:14 2017 +0300
Committer: Andrew Onishuk <aonishuk@hortonworks.com>
Committed: Tue Apr 11 14:14:14 2017 +0300

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/Constants.py   |  3 +-
 .../main/python/ambari_agent/HeartbeatThread.py | 87 ++++++++++++++++++++
 .../src/main/python/ambari_agent/Utils.py       | 37 +++++++++
 .../listeners/ServerResponsesListener.py        | 39 +++++++++
 .../listeners/TopologyEventListener.py          | 25 ++++++
 .../python/ambari_agent/listeners/__init__.py   | 18 ++++
 .../src/main/python/ambari_agent/security.py    | 38 ++++++++-
 .../ambari_agent/TestAgentStompResponses.py     | 70 +++++++++++++++-
 .../dummy_files/stomp/heartbeat_request.json    |  3 +
 .../test/python/coilmq/server/socket_server.py  |  3 +
 10 files changed, 317 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/2f20b2ed/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 6b80f00..0ff9eb9 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -18,4 +18,5 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
-AGENT_TMP_DIR = "/var/lib/ambari-agent/tmp"
\ No newline at end of file
+AGENT_TMP_DIR = "/var/lib/ambari-agent/tmp"
+CORRELATION_ID_STRING = 'correlationId'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f20b2ed/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
new file mode 100644
index 0000000..2c2f9d8
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -0,0 +1,87 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import json
+import time
+import logging
+import ambari_stomp
+import threading
+import security
+
+from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener
+
+HEARTBEAT_ENDPOINT = '/agent/heartbeat'
+REGISTRATION_ENDPOINT = '/agent/registration'
+SERVER_RESPONSES_ENDPOINT = '/user'
+HEARTBEAT_INTERVAL = 10
+
+logger = logging.getLogger(__name__)
+
+class HeartbeatThread(threading.Thread):
+  """
+  This thread handles registration and heartbeating routine.
+  """
+  def __init__(self):
+    threading.Thread.__init__(self)
+    self.stomp_connector = security.StompConnector()
+    self.is_registered = False
+    self.heartbeat_interval = HEARTBEAT_INTERVAL
+    self._stop = threading.Event()
+
+  def run(self):
+    while not self._stop.is_set():
+      try:
+        if not self.is_registered:
+          self.register()
+
+        heartbeat_body = self.get_heartbeat_body()
+        logger.debug("Heartbeat body is {0}".format(heartbeat_body))
+        response = self.blocking_request(heartbeat_body, HEARTBEAT_ENDPOINT)
+        logger.debug("Heartbeat response is {0}".format(response))
+
+        time.sleep(self.heartbeat_interval)
+        # TODO STOMP: handle heartbeat reponse
+      except:
+        logger.exception("Exception in HeartbeatThread. Re-running the registration")
+        # TODO STOMP: re-connect here
+        self.is_registered = False
+        pass
+
+  def blocking_request(self, body, destination):
+    self.stomp_connector.send(body=json.dumps(body), destination=destination)
+    return self.server_responses_listener.responses.blocking_pop(str(self.stomp_connector.correlation_id))
+
+  def register(self):
+    # TODO STOMP: prepare data to register
+    data = {'registration-test':'true'}
+    self.server_responses_listener = ServerResponsesListener()
+    self.stomp_connector._connection = self.stomp_connector._create_new_connection(self.server_responses_listener)
+    self.stomp_connector.add_listener(self.server_responses_listener)
+    self.stomp_connector.subscribe(destination=SERVER_RESPONSES_ENDPOINT, id=1, ack='client-individual')
+
+    logger.debug("Registration request is {0}".format(data))
+    response = self.blocking_request(data, REGISTRATION_ENDPOINT)
+    logger.debug("Registration response is {0}".format(response))
+
+    # TODO STOMP: handle registration response
+    self.registered = True
+
+  def get_heartbeat_body(self):
+    return {'heartbeat-request-test':'true'}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f20b2ed/ambari-agent/src/main/python/ambari_agent/Utils.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py
index e7b03f9..8078ad2 100644
--- a/ambari-agent/src/main/python/ambari_agent/Utils.py
+++ b/ambari-agent/src/main/python/ambari_agent/Utils.py
@@ -17,6 +17,43 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
+import threading
+
+class BlockingDictionary():
+  """
+  A dictionary like class.
+  Which allow putting an item. And retrieving it in blocking way (the caller is blocked until
item is available).
+  """
+  def __init__(self, dictionary=None):
+    self.dict = {} if dictionary is None else dictionary
+    self.cv = threading.Condition()
+    self.put_event = threading.Event()
+    self.dict_lock = threading.RLock()
+
+  def put(self, key, value):
+    """
+    Thread-safe put to dictionary.
+    """
+    with self.dict_lock:
+      self.dict[key] = value
+    self.put_event.set()
+
+  def blocking_pop(self, key):
+    """
+    Block until a key in dictionary is available and than pop it.
+    """
+    while True:
+      self.put_event.wait()
+      self.put_event.clear()
+      with self.dict_lock:
+        if key in self.dict:
+          return self.dict.pop(key)
+
+  def __repr__(self):
+    return self.dict.__repr__()
+
+  def __str__(self):
+    return self.dict.__str__()
 
 class Utils(object):
   @staticmethod

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f20b2ed/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
new file mode 100644
index 0000000..c7ad082
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
@@ -0,0 +1,39 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import ambari_stomp
+
+from ambari_agent import Utils
+from ambari_agent.Constants import CORRELATION_ID_STRING
+
+logging = logging.getLogger(__name__)
+
+class ServerResponsesListener(ambari_stomp.ConnectionListener):
+  def __init__(self):
+    self.responses = Utils.BlockingDictionary()
+
+  def on_message(self, headers, message):
+    logging.debug("Received headers={0} ; message={1}".format(headers, message))
+
+    if CORRELATION_ID_STRING in headers:
+      self.responses.put(headers[CORRELATION_ID_STRING], message)
+    else:
+      logging.warn("Received a message from server without a '{0}' header. Ignoring the message".format(CORRELATION_ID_STRING))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f20b2ed/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
new file mode 100644
index 0000000..44afc48
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
@@ -0,0 +1,25 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import ambari_stomp
+
+class MyListener(ambari_stomp.ConnectionListener):
+  def on_message(self, headers, message):
+    print "Received {0}".format(message)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f20b2ed/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
new file mode 100644
index 0000000..8cfcc3f
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
@@ -0,0 +1,18 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f20b2ed/ambari-agent/src/main/python/ambari_agent/security.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py
index 45de7bb..32ad556 100644
--- a/ambari-agent/src/main/python/ambari_agent/security.py
+++ b/ambari-agent/src/main/python/ambari_agent/security.py
@@ -29,6 +29,7 @@ import pprint
 import traceback
 import hostname
 import platform
+import ambari_stomp
 
 logger = logging.getLogger(__name__)
 
@@ -99,6 +100,41 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):
 
     return sock
 
+# TODO STOMP: When server part is ready re-write this class by extending WsConnection.
+class StompConnector:
+  def __init__(self):
+    self.correlation_id = -1
+    self._connection = None
+
+  # TODO STOMP: re-init this on_disconnect
+  def _get_connection(self):
+    if not self._connection:
+      self._connection = self._create_new_connection()
+    return self._connection
+
+  def _create_new_connection(self, listener):
+    # Connection for unit tests. TODO STOMP: fix this
+    hosts = [('127.0.0.1', 21613)]
+    connection = ambari_stomp.Connection(host_and_ports=hosts)
+    connection.set_listener('my_listener', listener)
+    connection.start()
+    connection.connect(wait=True)
+
+    return connection
+
+  def send(self, destination, body, content_type=None, headers=None, **keyword_headers):
+    self.correlation_id += 1
+    self._get_connection().send(destination, body, content_type=content_type, headers=headers,
correlationId=self.correlation_id, **keyword_headers)
+
+  def subscribe(self, *args, **kwargs):
+    self._get_connection().subscribe(*args, **kwargs)
+
+  def untrack_connection(self):
+    self.conn = None
+
+  def add_listener(self, listener):
+    pass
+    #self._get_connection().set_listener('my_listener', listener)
 
 class CachedHTTPSConnection:
   """ Caches a ssl socket and uses a single https connection to the server. """
@@ -254,7 +290,7 @@ class CertificateManager():
     generate_script = GEN_AGENT_KEY % {
       'hostname': hostname.hostname(self.config),
       'keysdir': keysdir}
-    
+
     logger.info(generate_script)
     if platform.system() == 'Windows':
       p = subprocess.Popen(generate_script, stdout=subprocess.PIPE)

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f20b2ed/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index 983b392..b0f3a16 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -17,15 +17,77 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 '''
+import os
+import sys
+import logging
 import json
+import time
 from coilmq.util import frames
 from coilmq.util.frames import Frame
 
 from BaseStompServerTestCase import BaseStompServerTestCase
 
+from ambari_agent import HeartbeatThread
+
+from mock.mock import MagicMock, patch
+
+# TODO: where agent sends?
+
 class TestAgentStompResponses(BaseStompServerTestCase):
-  def test_mock_server_can_start(self):
-    f = Frame(frames.MESSAGE, headers={'destination': '/clusters/c1/topologies'}, body=json.loads(self.get_json("topology_update.json")))
+  @patch.object(HeartbeatThread, "time")
+  def test_mock_server_can_start(self, time_mock):
+    self.init_stdout_logger()
+
+    heartbeat_thread = HeartbeatThread.HeartbeatThread()
+    heartbeat_thread.start()
+
+    connect_frame = self.server.frames_queue.get()
+    users_subscribe_frame = self.server.frames_queue.get()
+    registration_frame = self.server.frames_queue.get()
+
+    # server sends registration response
+    f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '0'}, body=self.get_json("registration_response.json"))
+    self.server.topic_manager.send(f)
+
+    heartbeat_frame = self.server.frames_queue.get()
+
+    heartbeat_thread._stop.set()
+
+    # server sends heartbeat response
+    f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '1'}, body=json.dumps({'heartbeat-response':'true'}))
+    self.server.topic_manager.send(f)
+
+    heartbeat_thread.join()
+    print "Thread successfully finished"
+
+  def _other(self):
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/configurations'}, body=self.get_json("configurations_update.json"))
+    self.server.topic_manager.send(f)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/commands'}, body=self.get_json("execution_commands.json"))
+    self.server.topic_manager.send(f)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/metadata'}, body=self.get_json("metadata_update.json"))
     self.server.topic_manager.send(f)
-    
-    
\ No newline at end of file
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_update.json"))
+    self.server.topic_manager.send(f)
+
+  def init_stdout_logger(self):
+    format='%(levelname)s %(asctime)s - %(message)s'
+
+    logger = logging.getLogger()
+    logger.setLevel(logging.DEBUG)
+    formatter = logging.Formatter(format)
+    chout = logging.StreamHandler(sys.stdout)
+    chout.setLevel(logging.DEBUG)
+    chout.setFormatter(formatter)
+    cherr = logging.StreamHandler(sys.stderr)
+    cherr.setLevel(logging.ERROR)
+    cherr.setFormatter(formatter)
+    logger.handlers = []
+    logger.addHandler(cherr)
+    logger.addHandler(chout)
+
+    logging.getLogger('stomp.py').setLevel(logging.WARN)
+    logging.getLogger('coilmq').setLevel(logging.INFO)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f20b2ed/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/heartbeat_request.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/heartbeat_request.json
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/heartbeat_request.json
new file mode 100644
index 0000000..155e796
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/heartbeat_request.json
@@ -0,0 +1,3 @@
+{
+  "alerts": []
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f20b2ed/ambari-common/src/test/python/coilmq/server/socket_server.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/server/socket_server.py b/ambari-common/src/test/python/coilmq/server/socket_server.py
index b4f081e..872ec01 100644
--- a/ambari-common/src/test/python/coilmq/server/socket_server.py
+++ b/ambari-common/src/test/python/coilmq/server/socket_server.py
@@ -4,6 +4,7 @@ The default/recommended SocketServer-based server implementation.
 import logging
 import socket
 import threading
+import Queue
 try:
     from socketserver import BaseRequestHandler, TCPServer, ThreadingMixIn
 except ImportError:
@@ -79,6 +80,7 @@ class StompRequestHandler(BaseRequestHandler, StompConnection):
                     self.buffer.append(data)
 
                     for frame in self.buffer:
+                        self.server.frames_queue.put(frame)
                         self.log.debug("Processing frame: %s" % frame)
                         self.engine.process_frame(frame)
                         if not self.engine.connected:
@@ -154,6 +156,7 @@ class StompServer(TCPServer):
         self.queue_manager = queue_manager
         self.topic_manager = topic_manager
         self.protocol = protocol
+        self.frames_queue = Queue.Queue()
         self._serving_event = threading.Event()
         self._shutdown_request_event = threading.Event()
         TCPServer.__init__(self, server_address, RequestHandlerClass)


Mime
View raw message