ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aonis...@apache.org
Subject [55/55] [abbrv] ambari git commit: AMBARI-20630. Integrate stomp client library into agent code (aonishuk)
Date Fri, 31 Mar 2017 07:23:02 GMT
AMBARI-20630. Integrate stomp client library into agent code (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/72b78424
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/72b78424
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/72b78424

Branch: refs/heads/branch-3.0-perf
Commit: 72b784249cc30da875393a411b9a78242256298c
Parents: fc6ab24
Author: Andrew Onishuk <aonishuk@hortonworks.com>
Authored: Fri Mar 31 10:21:49 2017 +0300
Committer: Andrew Onishuk <aonishuk@hortonworks.com>
Committed: Fri Mar 31 10:21:49 2017 +0300

----------------------------------------------------------------------
 ambari-agent/conf/unix/install-helper.sh        |  10 +
 ambari-agent/pom.xml                            |   1 +
 ambari-agent/src/packages/tarball/all.xml       |   5 +
 .../src/main/python/ambari_stomp/__init__.py    |  56 ++
 .../src/main/python/ambari_stomp/__main__.py    | 558 ++++++++++++
 .../python/ambari_stomp/adapter/__init__.py     |   2 +
 .../python/ambari_stomp/adapter/multicast.py    | 180 ++++
 .../src/main/python/ambari_stomp/backward.py    |  30 +
 .../src/main/python/ambari_stomp/backward2.py   |  61 ++
 .../src/main/python/ambari_stomp/backward3.py   |  68 ++
 .../main/python/ambari_stomp/backwardsock.py    |  11 +
 .../main/python/ambari_stomp/backwardsock25.py  |  32 +
 .../main/python/ambari_stomp/backwardsock26.py  |  16 +
 .../src/main/python/ambari_stomp/colors.py      |  18 +
 .../src/main/python/ambari_stomp/connect.py     | 203 +++++
 .../src/main/python/ambari_stomp/constants.py   |  29 +
 .../src/main/python/ambari_stomp/exception.py   |  35 +
 .../src/main/python/ambari_stomp/listener.py    | 522 ++++++++++++
 .../src/main/python/ambari_stomp/protocol.py    | 507 +++++++++++
 .../src/main/python/ambari_stomp/transport.py   | 843 +++++++++++++++++++
 .../src/main/python/ambari_stomp/utils.py       | 250 ++++++
 pom.xml                                         |   2 +
 22 files changed, 3439 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-agent/conf/unix/install-helper.sh
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/install-helper.sh b/ambari-agent/conf/unix/install-helper.sh
index 0e32d0a..35aec15 100644
--- a/ambari-agent/conf/unix/install-helper.sh
+++ b/ambari-agent/conf/unix/install-helper.sh
@@ -22,12 +22,14 @@ COMMON_DIR="/usr/lib/python2.6/site-packages/ambari_commons"
 RESOURCE_MANAGEMENT_DIR="/usr/lib/python2.6/site-packages/resource_management"
 JINJA_DIR="/usr/lib/python2.6/site-packages/ambari_jinja2"
 SIMPLEJSON_DIR="/usr/lib/python2.6/site-packages/ambari_simplejson"
+STOMP_DIR="/usr/lib/python2.6/site-packages/ambari_stomp"
 OLD_COMMON_DIR="/usr/lib/python2.6/site-packages/common_functions"
 INSTALL_HELPER_SERVER="/var/lib/ambari-server/install-helper.sh"
 COMMON_DIR_AGENT="/usr/lib/ambari-agent/lib/ambari_commons"
 RESOURCE_MANAGEMENT_DIR_AGENT="/usr/lib/ambari-agent/lib/resource_management"
 JINJA_AGENT_DIR="/usr/lib/ambari-agent/lib/ambari_jinja2"
 SIMPLEJSON_AGENT_DIR="/usr/lib/ambari-agent/lib/ambari_simplejson"
+STOMP_AGENT_DIR="/usr/lib/ambari-agent/lib/ambari_stomp"
 AMBARI_AGENT="/usr/lib/python2.6/site-packages/ambari_agent"
 PYTHON_WRAPER_TARGET="/usr/bin/ambari-python-wrap"
 AMBARI_AGENT_VAR="/var/lib/ambari-agent"
@@ -64,6 +66,10 @@ do_install(){
   if [ ! -d "$SIMPLEJSON_DIR" ]; then
     ln -s "$SIMPLEJSON_AGENT_DIR" "$SIMPLEJSON_DIR"
   fi
+  # setting stomp shared resource
+  if [ ! -d "$STOMP_DIR" ]; then
+    ln -s "$STOMP_AGENT_DIR" "$STOMP_DIR"
+  fi
   
   # on nano Ubuntu, when umask=027 those folders are created without 'x' bit for 'others'.
   # which causes failures when hadoop users try to access tmp_dir
@@ -149,6 +155,10 @@ do_remove(){
     rm -f $SIMPLEJSON_DIR
   fi
 
+  if [ -d "$STOMP_DIR" ]; then
+    rm -f $STOMP_DIR
+  fi
+
   if [ -d "$OLD_COMMON_DIR" ]; then
     rm -f $OLD_COMMON_DIR
   fi

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-agent/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-agent/pom.xml b/ambari-agent/pom.xml
index a57ed64..4807a35 100644
--- a/ambari-agent/pom.xml
+++ b/ambari-agent/pom.xml
@@ -41,6 +41,7 @@
     <resource_management.install.dir>/usr/lib/ambari-agent/lib/resource_management</resource_management.install.dir>
     <jinja.install.dir>/usr/lib/ambari-agent/lib/ambari_jinja2</jinja.install.dir>
     <simplejson.install.dir>/usr/lib/ambari-agent/lib/ambari_simplejson</simplejson.install.dir>
+    <stomp.install.dir>/usr/lib/ambari-agent/lib/ambari_stomp</stomp.install.dir>
     <lib.dir>/usr/lib/ambari-agent/lib</lib.dir>
     <deb.architecture>amd64</deb.architecture>
     <ambari.server.module>../ambari-server</ambari.server.module>

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-agent/src/packages/tarball/all.xml
----------------------------------------------------------------------
diff --git a/ambari-agent/src/packages/tarball/all.xml b/ambari-agent/src/packages/tarball/all.xml
index 363941a..a22f0bb 100644
--- a/ambari-agent/src/packages/tarball/all.xml
+++ b/ambari-agent/src/packages/tarball/all.xml
@@ -72,6 +72,11 @@
     </fileSet>
     <fileSet>
       <directoryMode>755</directoryMode>
+      <directory>${project.basedir}/../ambari-common/src/main/python/ambari_stomp</directory>
+      <outputDirectory>${stomp.install.dir}</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directoryMode>755</directoryMode>
       <directory>src/examples</directory>
       <outputDirectory>${lib.dir}/examples</outputDirectory>
     </fileSet>

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-common/src/main/python/ambari_stomp/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/__init__.py b/ambari-common/src/main/python/ambari_stomp/__init__.py
new file mode 100644
index 0000000..3654cee
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/__init__.py
@@ -0,0 +1,56 @@
+"""stomp.py provides connectivity to a message broker supporting the STOMP protocol.
+Protocol versions 1.0, 1.1 and 1.2 are supported.
+
+See the GITHUB project page for more information.
+
+Author: Jason R Briggs
+License: http://www.apache.org/licenses/LICENSE-2.0
+Project Page: https://github.com/jasonrbriggs/stomp.py
+
+"""
+
+import ambari_stomp.connect as connect
+import ambari_stomp.listener as listener
+
+__version__ = (4, 1, 17)
+
+##
+# Alias for STOMP 1.0 connections.
+#
+Connection10 = connect.StompConnection10
+StompConnection10 = Connection10
+
+##
+# Alias for STOMP 1.1 connections.
+#
+Connection11 = connect.StompConnection11
+StompConnection11 = Connection11
+
+##
+# Alias for STOMP 1.2 connections.
+#
+Connection12 = connect.StompConnection12
+StompConnection12 = Connection12
+
+##
+# Default connection alias (STOMP 1.1).
+#
+Connection = connect.StompConnection11
+
+##
+# Access to the default connection listener.
+#
+ConnectionListener = listener.ConnectionListener
+
+##
+# Access to the stats listener.
+#
+StatsListener = listener.StatsListener
+
+##
+# Access to the 'waiting' listener.
+WaitingListener = listener.WaitingListener
+
+##
+# Access to the printing listener
+PrintingListener = listener.PrintingListener

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-common/src/main/python/ambari_stomp/__main__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/__main__.py b/ambari-common/src/main/python/ambari_stomp/__main__.py
new file mode 100644
index 0000000..fb90de5
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/__main__.py
@@ -0,0 +1,558 @@
+"""The stomp.py command line client (used for testing or simple STOMP command scripting).
+
+"""
+
+import base64
+from cmd import Cmd
+from optparse import OptionParser
+import json
+import os
+import sys
+import time
+
+from ambari_stomp.adapter.multicast import MulticastConnection
+import ambari_stomp.colors
+from ambari_stomp.connect import ambari_stompConnection10, StompConnection11, StompConnection12
+from ambari_stomp.listener import ConnectionListener, StatsListener
+
+sys.path.append('.')
+import ambari_stomp as stomp
+
+
+##
+# Command-line version string
+#
+stomppy_version = 'Stomp.py Version %s.%s.%s' % stomp.__version__
+
+try:
+    import uuid
+except ImportError:
+    from backward import uuid
+
+
+class SubscriptionInfo(object):
+    """
+    Used to store info about a subscription.
+    """
+    def __init__(self, id, ack):
+        self.id = id
+        self.ack = ack
+
+
+class StompCLI(Cmd, ConnectionListener):
+    """
+    A command line interface to the stomp.py client.  See :py:class:`stomp.connect.StompConnection11`
+    for more information on establishing a connection to a stomp server.
+    """
+    def __init__(self, host='localhost', port=61613, user='', passcode='', ver='1.1', prompt='> ', verbose=True,
+                 use_ssl=False, heartbeats=(0, 0), stdin=sys.stdin, stdout=sys.stdout):
+        Cmd.__init__(self, 'Tab', stdin, stdout)
+        ConnectionListener.__init__(self)
+        self.prompt = prompt
+        self.verbose = verbose
+        self.user = user
+        self.passcode = passcode
+        self.__quit = False
+        if ver == '1.0':
+            self.conn = StompConnection10([(host, port)])
+        elif ver == '1.1':
+            self.conn = StompConnection11([(host, port)], heartbeats=heartbeats)
+        elif ver == '1.2':
+            self.conn = StompConnection12([(host, port)], heartbeats=heartbeats)
+        elif ver == 'multicast':
+            self.conn = MulticastConnection()
+        else:
+            raise RuntimeError('Unknown version')
+        if use_ssl:
+            self.conn.set_ssl([(host, port)])
+        self.conn.set_listener('', self)
+        self.conn.start()
+        self.conn.connect(self.user, self.passcode, wait=True)
+        self.transaction_id = None
+        self.version = ver
+        try:
+            self.nversion = float(ver)
+        except ValueError:
+            self.nversion = 1.0
+        self.__subscriptions = {}
+        self.__subscription_id = 1
+
+    def __print_async(self, frame_type, headers, body):
+        """
+        Utility function to print a message and setup the command prompt
+        for the next input
+        """
+        if self.__quit:
+            return
+        self.__sysout("\r  \r", end='')
+        if self.verbose:
+            self.__sysout(frame_type)
+            for k, v in headers.items():
+                self.__sysout('%s: %s' % (k, v))
+        if self.prompt != '':
+            self.__sysout('')
+        self.__sysout(body)
+        self.__sysout(self.prompt, end='')
+        self.stdout.flush()
+
+    def __sysout(self, msg, end="\n"):
+        self.stdout.write(str(msg) + end)
+
+    def __error(self, msg, end="\n"):
+        self.stdout.write(stomp.colors.BOLD + stomp.colors.RED + str(msg) + stomp.colors.NO_COLOR + end)
+
+    def on_connecting(self, host_and_port):
+        """
+        See :py:meth:`ConnectionListener.on_connecting`
+        """
+
+    def on_disconnected(self):
+        """
+        see :py:meth:`ConnectionListener.on_disconnected`
+        """
+        if not self.__quit:
+            self.__error("lost connection")
+
+    def on_message(self, headers, body):
+        """
+        See :py:meth:`ConnectionListener.on_message`
+
+        Special case: if the header 'filename' is present, the content is written out
+        as a file
+        """
+        if 'filename' in headers:
+            content = base64.b64decode(body.encode())
+            if os.path.exists(headers['filename']):
+                fname = '%s.%s' % (headers['filename'], int(time.time()))
+            else:
+                fname = headers['filename']
+            with open(fname, 'wb') as f:
+                f.write(content)
+            self.__print_async("MESSAGE", headers, "Saved file: %s" % fname)
+        else:
+            self.__print_async("MESSAGE", headers, body)
+
+    def on_error(self, headers, body):
+        """
+        See :py:meth:`ConnectionListener.on_error`
+        """
+        self.__print_async("ERROR", headers, body)
+
+    def on_receipt(self, headers, body):
+        """
+        See :py:meth:`ConnectionListener.on_receipt`
+        """
+        self.__print_async("RECEIPT", headers, body)
+
+    def on_connected(self, headers, body):
+        """
+        See :py:meth:`ConnectionListener.on_connected`
+        """
+        self.__print_async("CONNECTED", headers, body)
+
+    def help_help(self):
+        self.__sysout('Quick help on commands')
+
+    def default(self, line):
+        self.__error('Unknown command: %s' % line.split()[0])
+
+    def emptyline(self):
+        pass
+
+    def help(self, usage, description, required=(), optional=()):
+        rparams = "\n\t" + "\n\t".join(required)
+        oparams = "\n\t" + "\n\t".join(optional)
+
+        m = {
+            'hl': stomp.colors.BOLD + stomp.colors.GREEN,
+            'nc': stomp.colors.NO_COLOR,
+            'usage': usage,
+            'description': description,
+            'required': rparams.rstrip(),
+            'optional': oparams.rstrip()
+        }
+
+        if rparams.rstrip() != '':
+            rparams = '''%(hl)sRequired Parameters:%(nc)s%(required)s\n\n''' % m
+            m['required'] = rparams
+
+        if oparams.rstrip() != '':
+            oparams = '''%(hl)sOptional Parameters:%(nc)s%(optional)s\n\n''' % m
+            m['optional'] = oparams
+
+        self.__sysout('''%(hl)sUsage:%(nc)s
+\t%(usage)s
+
+%(required)s%(optional)s%(hl)sDescription:%(nc)s
+\t%(description)s
+        ''' % m)
+
+    def do_quit(self, args):
+        self.__quit = True
+        self.__sysout('Shutting down, please wait')
+        return True
+    do_exit = do_quit
+    do_EOF = do_quit
+
+    def help_quit(self):
+        self.help('exit', 'Exit the stomp client')
+    help_exit = help_quit
+
+    def help_EOF(self):
+        self.help('exit', 'Exit the stomp client (using CTRL-D)')
+
+    def do_subscribe(self, args):
+        args = args.split()
+        if len(args) < 1:
+            self.__error('Expecting: subscribe <destination> [ack]')
+            return
+
+        name = args[0]
+        if name in self.__subscriptions:
+            self.__error('Already subscribed to %s' % name)
+            return
+
+        ack_mode = 'auto'
+        if len(args) >= 2:
+            ack_mode = args[1]
+
+        sid = self.__subscription_id
+        self.__subscription_id += 1
+
+        self.__sysout('Subscribing to "%s" with acknowledge set to "%s", id set to "%s"' % (name, ack_mode, sid))
+        self.conn.subscribe(destination=name, ack=ack_mode, id=sid)
+        self.__subscriptions[name] = SubscriptionInfo(sid, ack_mode)
+
+    def help_subscribe(self):
+        self.help('subscribe <destination> [ack]',
+                  '''Register to listen to a given destination. Like send, the subscribe command requires a destination
+\theader indicating which destination to subscribe to. The ack parameter is optional, and defaults to
+\tauto.''', ['destination - the name to subscribe to'], ['ack - how to handle acknowledgements for a message; either automatically (auto) or manually (client)'])
+
+    def do_unsubscribe(self, args):
+        args = args.split()
+        if len(args) < 1:
+            self.__error('Expecting: unsubscribe <destination>')
+            return
+
+        if args[0] not in self.__subscriptions:
+            self.__sysout('Subscription %s not found' % args[0])
+            return
+
+        self.__sysout('Unsubscribing from "%s"' % args[0])
+        self.conn.unsubscribe(destination=args[0], id=self.__subscriptions[args[0]].id)
+        del self.__subscriptions[args[0]]
+
+    def help_unsubscribe(self):
+        self.help('unsubscribe <destination>', 'Remove an existing subscription - so that the client no longer receive messages from that destination.',
+                  ['destination - the name to unsubscribe from'], ['ack - how to handle acknowledgements for a message; either automatically (auto) or manually (client)'])
+
+    def do_send(self, args):
+        args = args.split()
+        if len(args) < 2:
+            self.__error('Expecting: send <destination> <message>')
+        elif not self.transaction_id:
+            self.conn.send(args[0], ' '.join(args[1:]))
+        else:
+            self.conn.send(args[0], ' '.join(args[1:]), transaction=self.transaction_id)
+
+    def complete_send(self, text, line, begidx, endidx):
+        mline = line.split(' ')[1]
+        offs = len(mline) - len(text)
+        return [s[offs:] for s in self.__subscriptions if s.startswith(mline)]
+    complete_unsubscribe = complete_send
+    complete_sendrec = complete_send
+    complete_sendreply = complete_send
+    complete_sendfile = complete_send
+
+    def help_send(self):
+        self.help('send <destination> <message>', 'Sends a message to a destination in the messaging system.',
+                  ['destination - where to send the message', 'message - the content to send'])
+
+    def do_sendrec(self, args):
+        args = args.split()
+        receipt_id = str(uuid.uuid4())
+        if len(args) < 2:
+            self.__error('Expecting: sendrec <destination> <message>')
+        elif not self.transaction_id:
+            self.conn.send(args[0], ' '.join(args[1:]), receipt=receipt_id)
+        else:
+            self.conn.send(args[0], ' '.join(args[1:]), transaction=self.transaction_id, receipt=receipt_id)
+
+    def help_sendrec(self):
+        self.help('sendrec <destination> <message>',
+                  'Sends a message to a destination in the messaging system and blocks for receipt of the message.',
+                  ['destination - where to send the message', 'message - the content to send'])
+
+    def do_sendreply(self, args):
+        args = args.split()
+        if len(args) < 3:
+            self.__error('Expecting: sendreply <destination> <correlation-id> <message>')
+        else:
+            self.conn.send(args[0], "%s\n" % ' '.join(args[2:]), headers={'correlation-id': args[1]})
+
+    def help_sendreply(self):
+        self.help('sendreply <destination> <correlation-id> <message>',
+                  'Sends a reply message to a destination in the messaging system.',
+                  ['destination - where to send the message',
+                   'correlation-id - the correlating identifier to send with the response',
+                   'message - the content to send'])
+
+    def do_sendfile(self, args):
+        args = args.split()
+        if len(args) < 2:
+            self.__error('Expecting: sendfile <destination> <filename> [headers.json]')
+        elif not os.path.exists(args[1]):
+            self.__error('File %s does not exist' % args[1])
+        else:
+            headers = {}
+            if len(args) == 3:
+                if not os.path.exists(args[2]):
+                    self.__error('File %s does not exist' % args[2])
+                    return
+                self.__sysout("Loading %s" % args[2])
+                with open(args[2], mode='rb') as jf:
+                    headers = json.load(jf)
+                    self.__sysout('Using headers %s' % str(headers))
+
+            with open(args[1], mode='rb') as f:
+                s = f.read()
+            msg = base64.b64encode(s).decode()
+            if not self.transaction_id:
+                self.conn.send(args[0], msg, filename=args[1], headers=headers)
+            else:
+                self.conn.send(args[0], msg, filename=args[1], headers=headers, transaction=self.transaction_id)
+
+    def help_sendfile(self):
+        self.help('sendfile <destination> <filename> [headers.json]',
+                  'Sends a file to a destination in the messaging system.',
+                  ['destination - where to send the message', 'filename - the file to send',
+                   'headers.json - json map with headers to send'])
+
+    def do_version(self, args):
+        self.__sysout('%s%s [Protocol version %s]%s' %
+                      (stomp.colors.BOLD, stomppy_version, self.conn.version, stomp.colors.NO_COLOR))
+    do_ver = do_version
+
+    def help_version(self):
+        self.help('version', 'Display the version of the client')
+    help_ver = help_version
+
+    def check_ack_nack(self, cmd, args):
+        if self.nversion >= 1.2 and len(args) < 1:
+            self.__error("Expecting: %s <ack-id>" % cmd)
+            return None
+        elif self.nversion == 1.1 and len(args) < 2:
+            self.__error("Expecting: %s <message-id> <subscription-id>" % cmd)
+            return None
+        elif len(args) < 1:
+            self.__error("Expecting: %s <message-id>" % cmd)
+            return None
+
+        if len(args) == 1:
+            return (args[0], None)
+        else:
+            return (args[0], args[1])
+
+    def do_ack(self, args):
+        args = args.split()
+        hdrs = self.check_ack_nack('ack', args)
+        if hdrs is None:
+            return
+
+        (message_id, subscription_id) = hdrs
+
+        if not self.transaction_id:
+            self.conn.ack(message_id, subscription_id)
+        else:
+            self.conn.ack(message_id, subscription_id, transaction=self.transaction_id)
+
+    def help_ack(self):
+        self.help('ack <message-id> [subscription-id]', '''The command 'ack' is used to acknowledge consumption of a message from a subscription using client
+\tacknowledgment. When a client has issued a 'subscribe' with the ack flag set to client, any messages
+\treceived from that destination will not be considered to have been consumed (by the server) until
+\tthe message has been acknowledged.''', ['message-id - the id of the message being acknowledged'], ['subscription-id the id of the subscription (only required for STOMP 1.1)'])
+
+    def do_nack(self, args):
+        args = args.split()
+        hdrs = self.check_ack_nack('nack', args)
+        if hdrs is None:
+            return
+
+        if not self.transaction_id:
+            self.conn.nack(headers=hdrs)
+        else:
+            self.conn.nack(headers=hdrs, transaction=self.transaction_id)
+
+    def help_nack(self):
+        self.help('nack <message-id> [subscription]', '''The command 'nack' is used to acknowledge the failure of a message from a subscription using client
+\tacknowledgment. When a client has issued a 'subscribe' with the ack flag set to client, any messages
+\treceived from that destination will not be considered to have been consumed (by the server) until
+\tthe message has been acknowledged (ack or nack).''', ['message-id - the id of the message being acknowledged'])
+
+    def do_abort(self, args):
+        if not self.transaction_id:
+            self.__error("Not currently in a transaction")
+        else:
+            self.conn.abort(transaction=self.transaction_id)
+            self.__sysout('Aborted transaction: %s' % self.transaction_id)
+            self.transaction_id = None
+    do_rollback = do_abort
+
+    def help_abort(self):
+        self.help('abort', 'Roll back a transaction in progress.')
+    help_rollback = help_abort
+
+    def do_begin(self, args):
+        if self.transaction_id:
+            self.__error("Currently in a transaction (%s)" % self.transaction_id)
+        else:
+            self.transaction_id = self.conn.begin()
+            self.__sysout('Transaction id: %s' % self.transaction_id)
+
+    def help_begin(self):
+        self.help('begin', '''Start a transaction. Transactions in this case apply to sending and acknowledging -
+\tany messages sent or acknowledged during a transaction will be handled atomically based on the
+\ttransaction.''')
+
+    def do_commit(self, args):
+        if not self.transaction_id:
+            self.__error("Not currently in a transaction")
+        else:
+            self.__sysout('Committing %s' % self.transaction_id)
+            self.conn.commit(transaction=self.transaction_id)
+            self.transaction_id = None
+
+    def help_commit(self):
+        self.help('commit', 'Commit a transaction in progress.')
+
+    def do_stats(self, args):
+        args = args.split()
+        if len(args) < 1:
+            stats = self.conn.get_listener('stats')
+            if stats:
+                self.__sysout(stats)
+            else:
+                self.__error('No stats available')
+        elif args[0] == 'on':
+            self.conn.set_listener('stats', StatsListener())
+        elif args[0] == 'off':
+            self.conn.remove_listener('stats')
+        else:
+            self.__error('Expecting: stats [on|off]')
+
+    def help_stats(self):
+        self.help('stats [on|off]', '''Record statistics on messages sent, received, errors, etc. If no argument (on|off) is specified,
+\tdump the current statistics.''')
+
+    def do_run(self, args):
+        args = args.split()
+        if len(args) == 0:
+            self.__error("Expecting: run <filename>")
+        elif not os.path.exists(args[0]):
+            self.__error("File %s was not found" % args[0])
+        else:
+            with open(args[0]) as f:
+                lines = f.read().split('\n')
+            for line in lines:
+                self.onecmd(line)
+
+    def help_run(self):
+        self.help('run <filename>', 'Execute commands in a specified file')
+
+
+def do_nothing_loop():
+    while 1:
+        time.sleep(1)
+
+
+def optional_arg(arg_default):
+    def func(option, opt_str, value, parser):
+        if parser.rargs and not parser.rargs[0].startswith('-'):
+            val = parser.rargs[0]
+            parser.rargs.pop(0)
+        else:
+            val = arg_default
+        setattr(parser.values, option.dest, val)
+    return func
+
+
+def main():
+    parser = OptionParser(version=stomppy_version)
+
+    parser.add_option('-H', '--host', type='string', dest='host', default='localhost',
+                      help='Hostname or IP to connect to. Defaults to localhost if not specified.')
+    parser.add_option('-P', '--port', type=int, dest='port', default=61613,
+                      help='Port providing stomp protocol connections. Defaults to 61613 if not specified.')
+    parser.add_option('-U', '--user', type='string', dest='user', default=None,
+                      help='Username for the connection')
+    parser.add_option('-W', '--password', type='string', dest='password', default=None,
+                      help='Password for the connection')
+    parser.add_option('-F', '--file', type='string', dest='filename',
+                      help='File containing commands to be executed, instead of prompting from the command prompt.')
+    parser.add_option('-S', '--stomp', type='string', dest='stomp', default='1.1',
+                      help='Set the STOMP protocol version.')
+    parser.add_option('-L', '--listen', type='string', dest='listen', default=None,
+                      help='Listen for messages on a queue/destination')
+    parser.add_option("-V", "--verbose", dest="verbose", default='on',
+                      help='Verbose logging "on" or "off" (if on, full headers from stomp server responses are printed)')
+    parser.add_option('--ssl', action='callback', callback=optional_arg(True), dest='ssl',
+                      help='Enable SSL connection')
+    parser.add_option('--heartbeats', type='string', dest='heartbeats', default="0,0",
+                      help='Heartbeats to request when connecting with protocol >= 1.1, two comma separated integers.')
+
+    parser.set_defaults()
+    (options, _) = parser.parse_args()
+
+    if options.verbose == 'on':
+        verbose = True
+    else:
+        verbose = False
+
+    if options.ssl is None:
+        options.ssl = False
+
+    if options.listen:
+        prompt = ''
+    else:
+        prompt = '> '
+
+    heartbeats = tuple(map(int, options.heartbeats.split(",")))
+
+    st = StompCLI(options.host, options.port, options.user, options.password, options.stomp, prompt, verbose,
+                  options.ssl, heartbeats)
+
+    if options.listen:
+        st.do_subscribe(options.listen)
+        try:
+            while 1:
+                time.sleep(10)
+        except:
+            print("\n")
+    elif options.filename:
+        st.do_run(options.filename)
+    else:
+        # disable CTRL-C, since can't guarantee correct handling of disconnect
+        import signal
+
+        def signal_handler(signal, frame):
+            pass
+        signal.signal(signal.SIGINT, signal_handler)
+
+        try:
+            try:
+                st.cmdloop()
+            except KeyboardInterrupt:
+                st.do_quit()
+        finally:
+            st.conn.disconnect()
+
+
+#
+# command line access
+#
+if __name__ == '__main__':
+    try:
+        main()
+    except:
+        pass

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-common/src/main/python/ambari_stomp/adapter/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/adapter/__init__.py b/ambari-common/src/main/python/ambari_stomp/adapter/__init__.py
new file mode 100644
index 0000000..7ae47b0
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/adapter/__init__.py
@@ -0,0 +1,2 @@
+"""Non-standard adapters.
+"""

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-common/src/main/python/ambari_stomp/adapter/multicast.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/adapter/multicast.py b/ambari-common/src/main/python/ambari_stomp/adapter/multicast.py
new file mode 100644
index 0000000..1d3a517
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/adapter/multicast.py
@@ -0,0 +1,180 @@
+"""Multicast transport for stomp.py.
+
+Obviously not a typical message broker, but convenient if you don't have a broker, but still want to use stomp.py
+methods.
+"""
+
+import socket
+import struct
+
+from ambari_stomp.connect import BaseConnection
+from ambari_stomp.protocol import *
+from ambari_stomp.transport import *
+from ambari_stomp.utils import *
+
+MCAST_GRP = '224.1.1.1'
+MCAST_PORT = 5000
+
+
+class MulticastTransport(Transport):
+    """
+    Transport over multicast connections rather than using a broker.
+    """
+    def __init__(self):
+        Transport.__init__(self, [], False, False, 0.0, 0.0, 0.0, 0.0, 0, False, None, None, None, None, False,
+                           DEFAULT_SSL_VERSION, None, None, None)
+        self.subscriptions = {}
+        self.current_host_and_port = (MCAST_GRP, MCAST_PORT)
+
+    def attempt_connection(self):
+        """
+        Establish a multicast connection - uses 2 sockets (one for sending, the other for receiving)
+        """
+        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+        self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
+
+        self.receiver_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+        self.receiver_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        self.receiver_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+        self.receiver_socket.bind(('', MCAST_PORT))
+        mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
+        self.receiver_socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
+
+        if not self.socket or not self.receiver_socket:
+            raise exception.ConnectFailedException()
+
+    def send(self, encoded_frame):
+        """
+        Send an encoded frame through the mcast socket.
+
+        :param bytes encoded_frame:
+        """
+        self.socket.sendto(encoded_frame, (MCAST_GRP, MCAST_PORT))
+
+    def receive(self):
+        """
+        Receive 1024 bytes from the multicast receiver socket.
+
+        :rtype: bytes
+        """
+        return self.receiver_socket.recv(1024)
+
+    def process_frame(self, f, frame_str):
+        """
+        :param Frame f: Frame object
+        :param bytes frame_str: Raw frame content
+        """
+        frame_type = f.cmd.lower()
+
+        if frame_type in ['disconnect']:
+            return
+
+        if frame_type == 'send':
+            frame_type = 'message'
+            f.cmd = 'MESSAGE'
+
+        if frame_type in ['connected', 'message', 'receipt', 'error', 'heartbeat']:
+            if frame_type == 'message':
+                if f.headers['destination'] not in self.subscriptions.values():
+                    return
+                (f.headers, f.body) = self.notify('before_message', f.headers, f.body)
+            self.notify(frame_type, f.headers, f.body)
+        if 'receipt' in f.headers:
+            receipt_frame = Frame('RECEIPT', {'receipt-id': f.headers['receipt']})
+            lines = convert_frame_to_lines(receipt_frame)
+            self.send(encode(pack(lines)))
+        log.debug("Received frame: %r, headers=%r, body=%r", f.cmd, f.headers, f.body)
+
+    def stop(self):
+        self.running = False
+        if hasattr(self.receiver_socket, 'SHUT_RDWR'):
+            self.receiver_socket.shutdown(socket.SHUT_RDWR)
+        self.receiver_socket.close()
+        self.disconnect_socket()
+        Transport.stop(self)
+
+
+class MulticastConnection(BaseConnection, Protocol12):
+    def __init__(self, wait_on_receipt=False):
+        """
+        :param bool wait_on_receipt: deprecated, ignored
+        """
+        self.transport = MulticastTransport()
+        self.transport.set_listener('mcast-listener', self)
+        self.transactions = {}
+        Protocol12.__init__(self, self.transport, (0, 0))
+
+    def connect(self, username=None, passcode=None, wait=False, headers=None, **keyword_headers):
+        """
+        :param str username:
+        :param str passcode:
+        :param bool wait:
+        :param dict headers:
+        :param keyword_headers:
+        """
+        pass
+
+    def subscribe(self, destination, id, ack='auto', headers=None, **keyword_headers):
+        """
+        :param str destination:
+        :param str id:
+        :param str ack:
+        :param dict headers:
+        :param keyword_headers:
+        """
+        self.transport.subscriptions[id] = destination
+
+    def unsubscribe(self, id, headers=None, **keyword_headers):
+        """
+        :param str id:
+        :param dict headers:
+        :param keyword_headers:
+        """
+        del self.transport.subscriptions[id]
+
+    def disconnect(self, receipt=None, headers=None, **keyword_headers):
+        """
+        :param str receipt:
+        :param dict headers:
+        :param keyword_headers:
+        """
+        Protocol12.disconnect(self, receipt, headers, **keyword_headers)
+        self.transport.stop()
+
+    def send_frame(self, cmd, headers=None, body=''):
+        """
+        :param str cmd:
+        :param dict headers:
+        :param body:
+        """
+        if headers is None:
+            headers = {}
+        frame = utils.Frame(cmd, headers, body)
+
+        if cmd == CMD_BEGIN:
+            trans = headers[HDR_TRANSACTION]
+            if trans in self.transactions:
+                self.notify('error', {}, 'Transaction %s already started' % trans)
+            else:
+                self.transactions[trans] = []
+        elif cmd == CMD_COMMIT:
+            trans = headers[HDR_TRANSACTION]
+            if trans not in self.transactions:
+                self.notify('error', {}, 'Transaction %s not started' % trans)
+            else:
+                for f in self.transactions[trans]:
+                    self.transport.transmit(f)
+                del self.transactions[trans]
+        elif cmd == CMD_ABORT:
+            trans = headers['transaction']
+            del self.transactions[trans]
+        else:
+            if 'transaction' in headers:
+                trans = headers['transaction']
+                if trans not in self.transactions:
+                    self.transport.notify('error', {}, 'Transaction %s not started' % trans)
+                    return
+                else:
+                    self.transactions[trans].append(frame)
+            else:
+                self.transport.transmit(frame)

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-common/src/main/python/ambari_stomp/backward.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/backward.py b/ambari-common/src/main/python/ambari_stomp/backward.py
new file mode 100644
index 0000000..0d8a0eb
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/backward.py
@@ -0,0 +1,30 @@
+import sys
+
+"""Functions to support backwards compatibility.
+
+Basically where we have functions which differ between python 2 and 3, we provide implementations here
+and then Python-specific versions in backward2 and backward3.
+"""
+
+if sys.hexversion >= 0x03000000:  # Python 3+
+    from ambari_stomp.backward3 import *
+else:  # Python 2
+    from ambari_stomp.backward2 import *
+
+
+def get_errno(e):
+    """
+    Return the errno of an exception, or the first argument if errno is not available.
+
+    :param Exception e: the exception object
+    """
+    try:
+        return e.errno
+    except AttributeError:
+        return e.args[0]
+
+
+try:
+    from time import monotonic
+except ImportError:  # Python < 3.3/3.5
+    from time import time as monotonic

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-common/src/main/python/ambari_stomp/backward2.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/backward2.py b/ambari-common/src/main/python/ambari_stomp/backward2.py
new file mode 100644
index 0000000..aec5f2e
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/backward2.py
@@ -0,0 +1,61 @@
+"""
+Python2-specific versions of various functions used by stomp.py
+"""
+
+NULL = '\x00'
+
+
+def input_prompt(prompt):
+    """
+    Get user input
+
+    :rtype: str
+    """
+    return raw_input(prompt)
+
+
+def decode(byte_data):
+    """
+    Decode the byte data to a string - in the case of this Py2 version, we can't really do anything (Py3 differs).
+
+    :param bytes byte_data:
+
+    :rtype: str
+    """
+    return byte_data  # no way to know if it's unicode or not, so just pass through unmolested
+
+
+def encode(char_data):
+    """
+    Encode the parameter as a byte string.
+
+    :param char_data:
+
+    :rtype: bytes
+    """
+    if type(char_data) is unicode:
+        return char_data.encode('utf-8')
+    else:
+        return char_data
+
+
+def pack(pieces=()):
+    """
+    Join a sequence of strings together (note: py3 version differs)
+
+    :param list pieces:
+
+    :rtype: bytes
+    """
+    return ''.join(encode(p) for p in pieces)
+
+
+def join(chars=()):
+    """
+    Join a sequence of characters into a string.
+
+    :param bytes chars:
+
+    :rtype str:
+    """
+    return ''.join(chars)

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-common/src/main/python/ambari_stomp/backward3.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/backward3.py b/ambari-common/src/main/python/ambari_stomp/backward3.py
new file mode 100644
index 0000000..50d59bc
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/backward3.py
@@ -0,0 +1,68 @@
+"""
+Python3-specific versions of various functions used by stomp.py
+"""
+
+NULL = b'\x00'
+
+
+def input_prompt(prompt):
+    """
+    Get user input
+
+    :param str prompt: the prompt to display to the user
+
+    :rtype: str
+    """
+    return input(prompt)
+
+
+def decode(byte_data):
+    """
+    Decode the byte data to a string if not None.
+
+    :param bytes byte_data: the data to decode
+
+    :rtype: str
+    """
+    if byte_data is None:
+        return None
+    return byte_data.decode()
+
+
+def encode(char_data):
+    """
+    Encode the parameter as a byte string.
+
+    :param char_data: the data to encode
+
+    :rtype: bytes
+    """
+    if type(char_data) is str:
+        return char_data.encode()
+    elif type(char_data) is bytes:
+        return char_data
+    else:
+        raise TypeError('message should be a string or bytes')
+
+
+def pack(pieces=()):
+    """
+    Join a sequence of strings together.
+
+    :param list pieces: list of strings
+
+    :rtype: bytes
+    """
+    encoded_pieces = (encode(piece) for piece in pieces)
+    return b''.join(encoded_pieces)
+
+
+def join(chars=()):
+    """
+    Join a sequence of characters into a string.
+
+    :param bytes chars: list of chars
+
+    :rtype: str
+    """
+    return b''.join(chars).decode()

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-common/src/main/python/ambari_stomp/backwardsock.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/backwardsock.py b/ambari-common/src/main/python/ambari_stomp/backwardsock.py
new file mode 100644
index 0000000..ee6d4bf
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/backwardsock.py
@@ -0,0 +1,11 @@
+"""Networking functions to support backwards compatibility.
+
+Distinct from the backward(2/3) functions to handle ipv6 changes between Python versions 2.5 and 2.6.
+"""
+
+import sys
+
+if sys.hexversion < 0x02060000:  # < Python 2.6
+    from ambari_stomp.backwardsock25 import *
+else:  # Python 2.6 onwards
+    from ambari_stomp.backwardsock26 import *

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-common/src/main/python/ambari_stomp/backwardsock25.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/backwardsock25.py b/ambari-common/src/main/python/ambari_stomp/backwardsock25.py
new file mode 100644
index 0000000..0a9279b
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/backwardsock25.py
@@ -0,0 +1,32 @@
+"""
+Python2.5 (and lower) specific versions of various networking (ipv6) functions used by stomp.py
+"""
+
+from socket import *
+
+ERRMSG = "getaddrinfo returns an empty list"
+
+
+def get_socket(host, port, timeout=None):
+    """
+    Return a socket.
+
+    :param str host: the hostname to connect to
+    :param int port: the port number to connect to
+    :param timeout: if specified, set the socket timeout
+    """
+    for res in getaddrinfo(host, port, 0, SOCK_STREAM):
+        af, socktype, proto, canonname, sa = res
+        sock = None
+        try:
+            sock = socket(af, socktype, proto)
+            if timeout is not None:
+                sock.settimeout(timeout)
+            sock.connect(sa)
+            return sock
+
+        except error, msg:
+            if sock is not None:
+                sock.close()
+
+    raise error, ERRMSG

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-common/src/main/python/ambari_stomp/backwardsock26.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/backwardsock26.py b/ambari-common/src/main/python/ambari_stomp/backwardsock26.py
new file mode 100644
index 0000000..14d86d4
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/backwardsock26.py
@@ -0,0 +1,16 @@
+"""
+Python2.6 (and greater) specific versions of various networking (ipv6) functions used by stomp.py
+"""
+
+import socket
+
+
+def get_socket(host, port, timeout=None):
+    """
+    Return a socket connection.
+
+    :param str host: the hostname to connect to
+    :param int port: the port number to connect to
+    :param timeout: if specified, set the socket timeout
+    """
+    return socket.create_connection((host, port), timeout)

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-common/src/main/python/ambari_stomp/colors.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/colors.py b/ambari-common/src/main/python/ambari_stomp/colors.py
new file mode 100644
index 0000000..dc57144
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/colors.py
@@ -0,0 +1,18 @@
+"""Color 'constants' used by the command line client.
+"""
+
+import platform
+
+
+if platform.system().lower() != 'windows':
+    GREEN = "\33[32m"
+    RED = "\33[31m"
+    NO_COLOR = "\33[0m"
+
+    BOLD = "\33[1m"
+else:
+    GREEN = ""
+    NO_COLOR = ""
+    RED = ""
+
+    BOLD = ""

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-common/src/main/python/ambari_stomp/connect.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/connect.py b/ambari-common/src/main/python/ambari_stomp/connect.py
new file mode 100644
index 0000000..e5c78f9
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/connect.py
@@ -0,0 +1,203 @@
+"""Main entry point for clients to create a STOMP connection.
+
+Provides connection classes for `1.0 <http://stomp.github.io/stomp-specification-1.0.html>`_,
+`1.1 <http://stomp.github.io/stomp-specification-1.1.html>`_, and
+`1.2 <http://stomp.github.io/stomp-specification-1.2.html>`_ versions of the STOMP protocol.
+"""
+
+from ambari_stomp.listener import *
+from ambari_stomp.protocol import *
+from ambari_stomp.transport import *
+
+
+class BaseConnection(Publisher):
+    """
+    Base class for all connection classes.
+    """
+
+    def __init__(self, transport):
+        """
+        :param Transport transport:
+        """
+        self.transport = transport
+
+    def set_listener(self, name, lstnr):
+        """
+        :param str name:
+        :param ConnectionListener lstnr:
+        """
+        self.transport.set_listener(name, lstnr)
+
+    def remove_listener(self, name):
+        """
+        :param str name:
+        """
+        self.transport.remove_listener(name)
+
+    def get_listener(self, name):
+        """
+        :param str name:
+
+        :rtype: ConnectionListener
+        """
+        return self.transport.get_listener(name)
+
+    def start(self):
+        self.transport.start()
+
+    def stop(self):
+        self.transport.stop()
+
+    def is_connected(self):
+        """
+        :rtype: bool
+        """
+        return self.transport.is_connected()
+
+    def set_receipt(self, receipt_id, value):
+        self.transport.set_receipt(receipt_id, value)
+
+    def set_ssl(self, *args, **kwargs):
+        self.transport.set_ssl(*args, **kwargs)
+
+    def get_ssl(self, *args, **kwargs):
+        self.transport.get_ssl(*args, **kwargs)
+
+
+class StompConnection10(BaseConnection, Protocol10):
+    """
+    Represents a 1.0 connection (comprising transport plus 1.0 protocol class).
+    See :py:class:`stomp.transport.Transport` for details on the initialisation parameters.
+    """
+    def __init__(self,
+                 host_and_ports=None,
+                 prefer_localhost=True,
+                 try_loopback_connect=True,
+                 reconnect_sleep_initial=0.1,
+                 reconnect_sleep_increase=0.5,
+                 reconnect_sleep_jitter=0.1,
+                 reconnect_sleep_max=60.0,
+                 reconnect_attempts_max=3,
+                 use_ssl=False,
+                 ssl_key_file=None,
+                 ssl_cert_file=None,
+                 ssl_ca_certs=None,
+                 ssl_cert_validator=None,
+                 wait_on_receipt=False,
+                 ssl_version=DEFAULT_SSL_VERSION,
+                 timeout=None,
+                 keepalive=None,
+                 auto_decode=True,
+                 auto_content_length=True):
+        transport = Transport(host_and_ports, prefer_localhost, try_loopback_connect,
+                              reconnect_sleep_initial, reconnect_sleep_increase, reconnect_sleep_jitter,
+                              reconnect_sleep_max, reconnect_attempts_max, use_ssl, ssl_key_file, ssl_cert_file,
+                              ssl_ca_certs, ssl_cert_validator, wait_on_receipt, ssl_version, timeout,
+                              keepalive, None, auto_decode)
+        BaseConnection.__init__(self, transport)
+        Protocol10.__init__(self, transport, auto_content_length)
+
+    def disconnect(self, receipt=None, headers=None, **keyword_headers):
+        """
+        Call the protocol disconnection, and then stop the transport itself.
+
+        :param str receipt: the receipt to use with the disconnect
+        :param dict headers: a map of any additional headers to send with the disconnection
+        :param keyword_headers: any additional headers to send with the disconnection
+        """
+        Protocol10.disconnect(self, receipt, headers, **keyword_headers)
+        self.transport.stop()
+
+
+class StompConnection11(BaseConnection, Protocol11):
+    """
+    Represents a 1.1 connection (comprising transport plus 1.1 protocol class)
+    See :py:class:`stomp.transport.Transport` for details on the initialisation parameters.
+    """
+    def __init__(self,
+                 host_and_ports=None,
+                 prefer_localhost=True,
+                 try_loopback_connect=True,
+                 reconnect_sleep_initial=0.1,
+                 reconnect_sleep_increase=0.5,
+                 reconnect_sleep_jitter=0.1,
+                 reconnect_sleep_max=60.0,
+                 reconnect_attempts_max=3,
+                 use_ssl=False,
+                 ssl_key_file=None,
+                 ssl_cert_file=None,
+                 ssl_ca_certs=None,
+                 ssl_cert_validator=None,
+                 wait_on_receipt=False,
+                 ssl_version=DEFAULT_SSL_VERSION,
+                 timeout=None,
+                 heartbeats=(0, 0),
+                 keepalive=None,
+                 vhost=None,
+                 auto_decode=True,
+                 auto_content_length=True):
+        transport = Transport(host_and_ports, prefer_localhost, try_loopback_connect,
+                              reconnect_sleep_initial, reconnect_sleep_increase, reconnect_sleep_jitter,
+                              reconnect_sleep_max, reconnect_attempts_max, use_ssl, ssl_key_file, ssl_cert_file,
+                              ssl_ca_certs, ssl_cert_validator, wait_on_receipt, ssl_version, timeout,
+                              keepalive, vhost, auto_decode)
+        BaseConnection.__init__(self, transport)
+        Protocol11.__init__(self, transport, heartbeats, auto_content_length)
+
+    def disconnect(self, receipt=None, headers=None, **keyword_headers):
+        """
+        Call the protocol disconnection, and then stop the transport itself.
+
+        :param str receipt: the receipt to use with the disconnect
+        :param dict headers: a map of any additional headers to send with the disconnection
+        :param keyword_headers: any additional headers to send with the disconnection
+        """
+        Protocol11.disconnect(self, receipt, headers, **keyword_headers)
+        self.transport.stop()
+
+
+class StompConnection12(BaseConnection, Protocol12):
+    """
+    Represents a 1.2 connection (comprising transport plus 1.2 protocol class).
+    See :py:class:`stomp.transport.Transport` for details on the initialisation parameters.
+    """
+    def __init__(self,
+                 host_and_ports=None,
+                 prefer_localhost=True,
+                 try_loopback_connect=True,
+                 reconnect_sleep_initial=0.1,
+                 reconnect_sleep_increase=0.5,
+                 reconnect_sleep_jitter=0.1,
+                 reconnect_sleep_max=60.0,
+                 reconnect_attempts_max=3,
+                 use_ssl=False,
+                 ssl_key_file=None,
+                 ssl_cert_file=None,
+                 ssl_ca_certs=None,
+                 ssl_cert_validator=None,
+                 wait_on_receipt=False,
+                 ssl_version=DEFAULT_SSL_VERSION,
+                 timeout=None,
+                 heartbeats=(0, 0),
+                 keepalive=None,
+                 vhost=None,
+                 auto_decode=True,
+                 auto_content_length=True):
+        transport = Transport(host_and_ports, prefer_localhost, try_loopback_connect,
+                              reconnect_sleep_initial, reconnect_sleep_increase, reconnect_sleep_jitter,
+                              reconnect_sleep_max, reconnect_attempts_max, use_ssl, ssl_key_file, ssl_cert_file,
+                              ssl_ca_certs, ssl_cert_validator, wait_on_receipt, ssl_version, timeout,
+                              keepalive, vhost, auto_decode)
+        BaseConnection.__init__(self, transport)
+        Protocol12.__init__(self, transport, heartbeats, auto_content_length)
+
+    def disconnect(self, receipt=None, headers=None, **keyword_headers):
+        """
+        Call the protocol disconnection, and then stop the transport itself.
+
+        :param str receipt: the receipt to use with the disconnect
+        :param dict headers: a map of any additional headers to send with the disconnection
+        :param keyword_headers: any additional headers to send with the disconnection
+        """
+        Protocol12.disconnect(self, receipt, headers, **keyword_headers)
+        self.transport.stop()

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-common/src/main/python/ambari_stomp/constants.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/constants.py b/ambari-common/src/main/python/ambari_stomp/constants.py
new file mode 100644
index 0000000..87e8acc
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/constants.py
@@ -0,0 +1,29 @@
+"""The STOMP command and header name strings.
+"""
+
+HDR_ACCEPT_VERSION = 'accept-version'
+HDR_ACK = 'ack'
+HDR_CONTENT_LENGTH = 'content-length'
+HDR_CONTENT_TYPE = 'content-type'
+HDR_DESTINATION = 'destination'
+HDR_HEARTBEAT = 'heart-beat'
+HDR_HOST = 'host'
+HDR_ID = 'id'
+HDR_MESSAGE_ID = 'message-id'
+HDR_LOGIN = 'login'
+HDR_PASSCODE = 'passcode'
+HDR_RECEIPT = 'receipt'
+HDR_SUBSCRIPTION = 'subscription'
+HDR_TRANSACTION = 'transaction'
+
+CMD_ABORT = 'ABORT'
+CMD_ACK = 'ACK'
+CMD_BEGIN = 'BEGIN'
+CMD_COMMIT = 'COMMIT'
+CMD_CONNECT = 'CONNECT'
+CMD_DISCONNECT = 'DISCONNECT'
+CMD_NACK = 'NACK'
+CMD_STOMP = 'STOMP'
+CMD_SEND = 'SEND'
+CMD_SUBSCRIBE = 'SUBSCRIBE'
+CMD_UNSUBSCRIBE = 'UNSUBSCRIBE'

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-common/src/main/python/ambari_stomp/exception.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/exception.py b/ambari-common/src/main/python/ambari_stomp/exception.py
new file mode 100644
index 0000000..398096b
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/exception.py
@@ -0,0 +1,35 @@
+"""Errors thrown by stomp.py connections.
+"""
+
+class StompException(Exception):
+    """
+    Common exception class. All specific stomp.py exceptions are subclasses
+    of StompException, allowing the library user to catch all current and
+    future library exceptions.
+    """
+
+
+class ConnectionClosedException(StompException):
+    """
+    Raised in the receiver thread when the connection has been closed
+    by the server.
+    """
+
+
+class NotConnectedException(StompException):
+    """
+    Raised when there is currently no server connection.
+    """
+
+
+class ConnectFailedException(StompException):
+    """
+    Raised by Connection.attempt_connection when reconnection attempts
+    have exceeded Connection.__reconnect_attempts_max.
+    """
+
+
+class InterruptedException(StompException):
+    """
+    Raised by receive when data read is interrupted.
+    """

http://git-wip-us.apache.org/repos/asf/ambari/blob/72b78424/ambari-common/src/main/python/ambari_stomp/listener.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_stomp/listener.py b/ambari-common/src/main/python/ambari_stomp/listener.py
new file mode 100644
index 0000000..9ed040a
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_stomp/listener.py
@@ -0,0 +1,522 @@
+"""Various listeners for using with stomp.py connections.
+"""
+
+import logging
+import sys
+import threading
+import time
+
+from ambari_stomp.backward import monotonic
+from ambari_stomp.constants import *
+import ambari_stomp.exception as exception
+import ambari_stomp.utils as utils
+
+
+log = logging.getLogger('stomp.py')
+
+
+class Publisher(object):
+    """
+    Simply a registry of listeners.
+    """
+
+    def set_listener(self, name, listener):
+        """
+        Set a named listener to use with this connection. See :py:class:`stomp.listener.ConnectionListener`
+
+        :param str name: the name of the listener
+        :param ConnectionListener listener: the listener object
+        """
+        pass
+
+    def remove_listener(self, name):
+        """
+        Remove a listener.
+
+        :param str name: the name of the listener to remove
+        """
+        pass
+
+    def get_listener(self, name):
+        """
+        Return the named listener.
+
+        :param str name: the listener to return
+
+        :rtype: ConnectionListener
+        """
+        return None
+
+
+class ConnectionListener(object):
+    """
+    This class should be used as a base class for objects registered
+    using Connection.set_listener().
+    """
+    def on_connecting(self, host_and_port):
+        """
+        Called by the STOMP connection once a TCP/IP connection to the
+        STOMP server has been established or re-established. Note that
+        at this point, no connection has been established on the STOMP
+        protocol level. For this, you need to invoke the "connect"
+        method on the connection.
+
+        :param (str,int) host_and_port: a tuple containing the host name and port number to which the connection
+            has been established.
+        """
+        pass
+
+    def on_connected(self, headers, body):
+        """
+        Called by the STOMP connection when a CONNECTED frame is
+        received (after a connection has been established or
+        re-established).
+
+        :param dict headers: a dictionary containing all headers sent by the server as key/value pairs.
+        :param body: the frame's payload. This is usually empty for CONNECTED frames.
+        """
+        pass
+
+    def on_disconnected(self):
+        """
+        Called by the STOMP connection when a TCP/IP connection to the
+        STOMP server has been lost.  No messages should be sent via
+        the connection until it has been reestablished.
+        """
+        pass
+
+    def on_heartbeat_timeout(self):
+        """
+        Called by the STOMP connection when a heartbeat message has not been
+        received beyond the specified period.
+        """
+        pass
+
+    def on_before_message(self, headers, body):
+        """
+        Called by the STOMP connection before a message is returned to the client app. Returns a tuple
+        containing the headers and body (so that implementing listeners can pre-process the content).
+
+        :param dict headers: the message headers
+        :param body: the message body
+        """
+        return headers, body
+
+    def on_message(self, headers, body):
+        """
+        Called by the STOMP connection when a MESSAGE frame is received.
+
+        :param dict headers: a dictionary containing all headers sent by the server as key/value pairs.
+        :param body: the frame's payload - the message body.
+        """
+        pass
+
+    def on_receipt(self, headers, body):
+        """
+        Called by the STOMP connection when a RECEIPT frame is
+        received, sent by the server if requested by the client using
+        the 'receipt' header.
+
+        :param dict headers: a dictionary containing all headers sent by the server as key/value pairs.
+        :param body: the frame's payload. This is usually empty for RECEIPT frames.
+        """
+        pass
+
+    def on_error(self, headers, body):
+        """
+        Called by the STOMP connection when an ERROR frame is received.
+
+        :param dict headers: a dictionary containing all headers sent by the server as key/value pairs.
+        :param body: the frame's payload - usually a detailed error description.
+        """
+        pass
+
+    def on_send(self, frame):
+        """
+        Called by the STOMP connection when it is in the process of sending a message
+
+        :param Frame frame: the frame to be sent
+        """
+        pass
+
+    def on_heartbeat(self):
+        """
+        Called on receipt of a heartbeat.
+        """
+        pass
+
+
+class HeartbeatListener(ConnectionListener):
+    """
+    Listener used to handle STOMP heartbeating.
+    """
+    def __init__(self, heartbeats):
+        self.running = False
+        self.heartbeats = heartbeats
+        self.received_heartbeat = None
+        self.heartbeat_thread = None
+        self.next_outbound_heartbeat = None
+
+    def on_connected(self, headers, body):
+        """
+        Once the connection is established, and 'heart-beat' is found in the headers, we calculate the real
+        heartbeat numbers (based on what the server sent and what was specified by the client) - if the heartbeats
+        are not 0, we start up the heartbeat loop accordingly.
+
+        :param dict headers: headers in the connection message
+        :param body: the message body
+        """
+        if 'heart-beat' in headers:
+            self.heartbeats = utils.calculate_heartbeats(
+                headers['heart-beat'].replace(' ', '').split(','), self.heartbeats)
+            if self.heartbeats != (0, 0):
+                self.send_sleep = self.heartbeats[0] / 1000
+
+                # receive gets an additional grace of 50%
+                self.receive_sleep = (self.heartbeats[1] / 1000) * 1.5
+
+                # Give grace of receiving the first heartbeat
+                self.received_heartbeat = monotonic() + self.receive_sleep
+
+                self.running = True
+                if self.heartbeat_thread is None:
+                    self.heartbeat_thread = utils.default_create_thread(
+                        self.__heartbeat_loop)
+                    self.heartbeat_thread.name = "StompHeartbeat%s" % \
+                        getattr(self.heartbeat_thread, "name", "Thread")
+
+    def on_disconnected(self):
+        self.running = False
+
+    def on_message(self, headers, body):
+        """
+        Reset the last received time whenever a message is received.
+
+        :param dict headers: headers in the message
+        :param body: the message content
+        """
+        # reset the heartbeat for any received message
+        self.__update_heartbeat()
+
+    def on_receipt(self, *_):
+        """
+        Reset the last received time whenever a receipt is received.
+        """
+        self.__update_heartbeat()
+
+    def on_error(self, *_):
+        """
+        Reset the last received time whenever an error is received.
+        """
+        self.__update_heartbeat()
+
+    def on_heartbeat(self):
+        """
+        Reset the last received time whenever a heartbeat message is received.
+        """
+        self.__update_heartbeat()
+
+    def on_send(self, frame):
+        """
+        Add the heartbeat header to the frame when connecting, and bump
+        next outbound heartbeat timestamp.
+
+        :param Frame frame: the Frame object
+        """
+        if frame.cmd == CMD_CONNECT or frame.cmd == CMD_STOMP:
+            if self.heartbeats != (0, 0):
+                frame.headers[HDR_HEARTBEAT] = '%s,%s' % self.heartbeats
+        if self.next_outbound_heartbeat is not None:
+            self.next_outbound_heartbeat = monotonic() + self.send_sleep
+
+    def __update_heartbeat(self):
+        # Honour any grace that has been already included
+        if self.received_heartbeat is None:
+            return
+        now = monotonic()
+        if now > self.received_heartbeat:
+            self.received_heartbeat = now
+
+    def __heartbeat_loop(self):
+        """
+        Main loop for sending (and monitoring received) heartbeats.
+        """
+        now = monotonic()
+
+        # Setup the initial due time for the outbound heartbeat
+        if self.send_sleep != 0:
+            self.next_outbound_heartbeat = now + self.send_sleep
+
+        while self.running:
+            now = monotonic()
+
+            next_events = []
+            if self.next_outbound_heartbeat is not None:
+                next_events.append(self.next_outbound_heartbeat - now)
+            if self.receive_sleep != 0:
+                t = self.received_heartbeat + self.receive_sleep - now
+                if t > 0:
+                    next_events.append(t)
+            sleep_time = min(next_events)
+            if sleep_time > 0:
+                time.sleep(sleep_time)
+
+            now = monotonic()
+
+            if not self.transport.is_connected():
+                time.sleep(self.send_sleep)
+                continue
+
+            if self.send_sleep != 0 and now > self.next_outbound_heartbeat:
+                log.debug("Sending a heartbeat message at %s", now)
+                try:
+                    self.transport.transmit(utils.Frame(None, {}, None))
+                except exception.NotConnectedException:
+                    log.debug("Lost connection, unable to send heartbeat")
+                except Exception:
+                    _, e, _ = sys.exc_info()
+                    log.debug("Unable to send heartbeat, due to: %s", e)
+
+            if self.receive_sleep != 0:
+                diff_receive = now - self.received_heartbeat
+
+                if diff_receive > self.receive_sleep:
+                    # heartbeat timeout
+                    log.warning("Heartbeat timeout: diff_receive=%s, time=%s, lastrec=%s",
+                                diff_receive, now, self.received_heartbeat)
+                    self.transport.set_connected(False)
+                    self.transport.disconnect_socket()
+                    self.transport.stop()
+                    for listener in self.transport.listeners.values():
+                        listener.on_heartbeat_timeout()
+        self.heartbeat_thread = None
+
+
+class WaitingListener(ConnectionListener):
+    """
+    A listener which waits for a specific receipt to arrive.
+    """
+    def __init__(self, receipt):
+        """
+        :param str receipt:
+        """
+        self.condition = threading.Condition()
+        self.receipt = receipt
+        self.received = False
+
+    def on_receipt(self, headers, body):
+        """
+        If the receipt id can be found in the headers, then notify the waiting thread.
+
+        :param dict headers: headers in the message
+        :param body: the message content
+        """
+        if 'receipt-id' in headers and headers['receipt-id'] == self.receipt:
+            with self.condition:
+                self.received = True
+                self.condition.notify()
+
+    def wait_on_receipt(self):
+        """
+        Wait until we receive a message receipt.
+        """
+        with self.condition:
+            while not self.received:
+                self.condition.wait()
+        self.received = False
+
+
+class StatsListener(ConnectionListener):
+    """
+    A connection listener for recording statistics on messages sent and received.
+    """
+    def __init__(self):
+        # The number of errors received
+        self.errors = 0
+        # The number of connections established
+        self.connections = 0
+        # The number of disconnections
+        self.disconnects = 0
+        # The number of messages received
+        self.messages = 0
+        # The number of messages sent
+        self.messages_sent = 0
+        # The number of heartbeat timeouts
+        self.heartbeat_timeouts = 0
+        # The number of heartbeats
+        self.heartbeat_count = 0
+
+    def on_disconnected(self):
+        """
+        Increment the disconnect count. See :py:meth:`ConnectionListener.on_disconnected`
+        """
+        self.disconnects += 1
+        log.info("disconnected (x %s)", self.disconnects)
+
+    def on_error(self, headers, body):
+        """
+        Increment the error count. See :py:meth:`ConnectionListener.on_error`
+
+        :param dict headers: headers in the message
+        :param body: the message content
+        """
+        log.info("received an error %s [%s]", body, headers)
+        self.errors += 1
+
+    def on_connecting(self, host_and_port):
+        """
+        Increment the connection count. See :py:meth:`ConnectionListener.on_connecting`
+
+        :param (str,int) host_and_port: the host and port as a tuple
+        """
+        log.info("connecting %s %s (x %s)", host_and_port[0], host_and_port[1], self.connections)
+        self.connections += 1
+
+    def on_message(self, headers, body):
+        """
+        Increment the message received count. See :py:meth:`ConnectionListener.on_message`
+
+        :param dict headers: headers in the message
+        :param body: the message content
+        """
+        self.messages += 1
+
+    def on_send(self, frame):
+        """
+        Increment the send count. See :py:meth:`ConnectionListener.on_send`
+
+        :param Frame frame:
+        """
+        self.messages_sent += 1
+
+    def on_heartbeat_timeout(self):
+        """
+        Increment the heartbeat timeout. See :py:meth:`ConnectionListener.on_heartbeat_timeout`
+        """
+        log.debug("received heartbeat timeout")
+        self.heartbeat_timeouts += 1
+
+    def on_heartbeat(self):
+        """
+        Increment the heartbeat count. See :py:meth:`ConnectionListener.on_heartbeat`
+        """
+        self.heartbeat_count += 1
+
+    def __str__(self):
+        """
+        Return a string containing the current statistics (messages sent and received,
+        errors, etc)
+        """
+        return '''Connections: %s
+Messages sent: %s
+Messages received: %s
+Heartbeats received: %s
+Errors: %s''' % (self.connections, self.messages_sent, self.messages, self.heartbeat_count, self.errors)
+
+
+class PrintingListener(ConnectionListener):
+    def on_connecting(self, host_and_port):
+        """
+        :param (str,int) host_and_port:
+        """
+        print('on_connecting %s %s' % host_and_port)
+
+    def on_connected(self, headers, body):
+        """
+        :param dict headers:
+        :param body:
+        """
+        print('on_connected %s %s' % (headers, body))
+
+    def on_disconnected(self):
+        print('on_disconnected')
+
+    def on_heartbeat_timeout(self):
+        print('on_heartbeat_timeout')
+
+    def on_before_message(self, headers, body):
+        """
+        :param dict headers:
+        :param body:
+        """
+        print('on_before_message %s %s' % (headers, body))
+        return headers, body
+
+    def on_message(self, headers, body):
+        """
+        :param dict headers:
+        :param body:
+        """
+        print('on_message %s %s' % (headers, body))
+
+    def on_receipt(self, headers, body):
+        """
+        :param dict headers:
+        :param body:
+        """
+        print('on_receipt %s %s' % (headers, body))
+
+    def on_error(self, headers, body):
+        """
+        :param dict headers:
+        :param body:
+        """
+        print('on_error %s %s' % (headers, body))
+
+    def on_send(self, frame):
+        """
+        :param Frame frame:
+        """
+        print('on_send %s %s %s' % (frame.cmd, frame.headers, frame.body))
+
+    def on_heartbeat(self):
+        print('on_heartbeat')
+
+
+class TestListener(StatsListener, WaitingListener):
+    """
+    Implementation of StatsListener and WaitingListener. Useful for testing.
+    """
+    def __init__(self, receipt=None):
+        """
+        :param str receipt:
+        """
+        StatsListener.__init__(self)
+        WaitingListener.__init__(self, receipt)
+        self.message_list = []
+        self.message_condition = threading.Condition()
+        self.message_received = False
+        self.heartbeat_condition = threading.Condition()
+        self.heartbeat_received = False
+
+    def on_message(self, headers, message):
+        """
+        :param dict headers:
+        :param message:
+        """
+        StatsListener.on_message(self, headers, message)
+        self.message_list.append((headers, message))
+        with self.message_condition:
+            self.message_received = True
+            self.message_condition.notify()
+
+    def wait_for_message(self):
+        with self.message_condition:
+            while not self.message_received:
+                self.message_condition.wait()
+        self.message_received = False
+
+    def get_latest_message(self):
+        return self.message_list[-1]
+
+    def on_heartbeat(self):
+        StatsListener.on_heartbeat(self)
+        with self.heartbeat_condition:
+            self.heartbeat_received = True
+            self.heartbeat_condition.notify()
+
+    def wait_for_heartbeat(self):
+        with self.heartbeat_condition:
+            while not self.heartbeat_received:
+                self.heartbeat_condition.wait()
+        self.heartbeat_received = False


Mime
View raw message