qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1602494 - in /qpid/dispatch/trunk: ./ python/qpid_dispatch_internal/management/ tests/ tools/
Date Fri, 13 Jun 2014 19:08:16 GMT
Author: aconway
Date: Fri Jun 13 19:08:16 2014
New Revision: 1602494

URL: http://svn.apache.org/r1602494
Log:
DISPATCH-56: Refactor qdstat tool and system_test framework to use management.amqp.Node.

Added:
    qpid/dispatch/trunk/tools/qdstat
      - copied, changed from r1602493, qpid/dispatch/trunk/tools/qdstat.in
Removed:
    qpid/dispatch/trunk/tools/qdstat.in
Modified:
    qpid/dispatch/trunk/CMakeLists.txt
    qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/management/entity.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py
    qpid/dispatch/trunk/tests/system_test.py
    qpid/dispatch/trunk/tests/system_tests_broker.py
    qpid/dispatch/trunk/tests/system_tests_management.py

Modified: qpid/dispatch/trunk/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/CMakeLists.txt?rev=1602494&r1=1602493&r2=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/CMakeLists.txt (original)
+++ qpid/dispatch/trunk/CMakeLists.txt Fri Jun 13 19:08:16 2014
@@ -97,9 +97,6 @@ set(CATCH_UNDEFINED "-Wl,--no-undefined"
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/conditionals.h.in
                ${CMAKE_CURRENT_BINARY_DIR}/conditionals.h)
 
-configure_file(${CMAKE_CURRENT_SOURCE_DIR}/tools/qdstat.in
-               ${CMAKE_CURRENT_BINARY_DIR}/tools/qdstat)
-
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/build_env.py.in
                ${CMAKE_CURRENT_BINARY_DIR}/build_env.py)
 
@@ -157,7 +154,7 @@ install(FILES etc/qdrouterd.conf DESTINA
 ## Python modules installation
 ##
 set(TOOLS_EXECUTABLES
-    ${CMAKE_CURRENT_BINARY_DIR}/tools/qdstat
+    ${CMAKE_CURRENT_SOURCE_DIR}/tools/qdstat
 )
 
 set(DOC_FILES

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py?rev=1602494&r1=1602493&r2=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/amqp.py Fri Jun 13 19:08:16
2014
@@ -23,6 +23,7 @@ AMQP management tools for Qpid dispatch.
 
 import proton, re, threading, httplib
 from collections import namedtuple
+from entity import Entity
 
 class Error(Exception): pass
 
@@ -67,8 +68,6 @@ class Url:
             self.password = kwargs.get('password')
             self.host = kwargs.get('host')
             self.port = kwargs.get('port')
-            if self.host is None:
-                raise ValueError('Host required for url')
             self.path = kwargs.get('path')
         elif isinstance(s, Url):
             self.scheme = s.scheme
@@ -97,10 +96,10 @@ class Url:
             s += self.user
         if self.password:
             s += ":%s@" % self.password
-        if ':' not in self.host:
-            s += self.host
-        else:
+        if self.host and ':' in self.host:
             s += "[%s]" % self.host
+        else:
+            s += self.host or '0.0.0.0'
         if self.port:
             s += ":%s" % self.port
         if self.path:
@@ -140,7 +139,7 @@ class Node(object):
     NODE_TYPE='org.amqp.management' # AMQP management node type
     NODE_PROPERTIES={'name':SELF, 'type':NODE_TYPE}
 
-    def __init__(self, address, router=None, locales=None):
+    def __init__(self, address=None, router=None, locales=None):
         """
         @param address: AMQP address of the management node.
         @param router: If address does not contain a path, use the management node for this
router ID.
@@ -171,7 +170,8 @@ class Node(object):
         self.messenger = None
 
     def __del__(self):
-        self.stop()
+        if hasattr(self, 'messenger'):
+            self.stop()
 
     def _flush(self):
         """Call self.messenger.work() till there is no work left."""
@@ -235,14 +235,18 @@ class Node(object):
         self.check_response(response)
         return response
 
-    class QueryResult(namedtuple('QueryResult', ['attribute_names', 'results'])):
+    class QueryResponse(list):
         """
-        Result returned by L{query}
+        Result returned by L{query}. Behaves as a list of L{Entity}.
         @ivar attribute_names: List of attribute names for the results.
-        @ivar results: List of lists. Each entry is a list of attribute values
-            corresponding to the attribute_names.
         """
-        pass
+        def __init__(self, response):
+            """
+            @param response: the respose message to a query.
+            """
+            self.attribute_names = response.body['attributeNames']
+            for r in response.body['results']:
+                self.append(Entity(attributes=dict(zip(self.attribute_names, r))))
 
     def query(self, entity_type=None, attribute_names=None, offset=None, count=None):
         """
@@ -252,9 +256,10 @@ class Node(object):
         @keyword attribute_names: A list of attribute names to query.
         @keyword offset: An integer offset into the list of results to return.
         @keyword count: A count of the maximum number of results to return.
-        @return: A L{QueryResult}
+        @return: A L{QueryResponse}
         """
+        attribute_names = attribute_names or []
         response = self.call(self.node_request(
             operation='QUERY', entityType=entity_type, offset=offset, count=count,
-            body={'attributeNames':attribute_names or []}))
-        return Node.QueryResult(response.body['attributeNames'], response.body['results'])
+            body={'attributeNames':attribute_names}))
+        return Node.QueryResponse(response)

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/entity.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/entity.py?rev=1602494&r1=1602493&r2=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/entity.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/entity.py Fri Jun 13 19:08:16
2014
@@ -26,7 +26,7 @@ An entity has a set of named attributes 
 from schema import EntityType
 from copy import copy
 
-class Entity(dict):
+class Entity(object):
     """
     A management entity: a set of attributes with an associated entity-type.
 
@@ -37,18 +37,19 @@ class Entity(dict):
     @ivar I{attribute-name}: Access an entity attribute as a python attribute.
     """
 
-    def __init__(self, entity_type, attributes=None, schema=None, **kw_attributes):
+    def __init__(self, entity_type=None, attributes=None, schema=None, **kw_attributes):
         """
         @param entity_type: An L{EntityType} or the name of an entity type in the schema.
-        @param schema: The L{Schema} defining entity_type.
         @param attributes: An attribute mapping.
+        @param schema: The L{Schema} defining entity_type.
         @param kw_attributes: Attributes as keyword arguments.
         """
         super(Entity, self).__init__()
         if schema and entity_type in schema.entity_types:
             self.entity_type = schema.entity_types[entity_type]
         else:
-            assert isinstance(entity_type, EntityType), "'%s' is not an entity type"%entity_type
+            assert entity_type is None or \
+                isinstance(entity_type, EntityType), "'%s' is not an entity type"%entity_type
             self.entity_type = entity_type
         self.attributes = attributes or {}
         self.attributes.update(kw_attributes)
@@ -76,6 +77,10 @@ class Entity(dict):
         else:
             return (self.entity_type.name, self.attributes)
 
+    def __str__(self):
+        return str(self.dump)
+
+
 class EntityList(list):
     """
     A list of entities with some convenience methods for finding entities
@@ -150,6 +155,9 @@ class EntityList(list):
         """
         return [e.dump(as_map) for e in self]
 
+    def __str__(self):
+        return str(self.dump())
+
     def replace(self, contents):
         """
         Replace the contents of the list.

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py?rev=1602494&r1=1602493&r2=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/management/schema.py Fri Jun 13 19:08:16
2014
@@ -274,6 +274,9 @@ class AttributeTypeHolder(object):
             ('description', self.description or None)
         ])
 
+    def __str__(self):
+        print self.name
+
 class IncludeType(AttributeTypeHolder):
 
     def __init__(self, name, schema, attributes=None, description=""):

Modified: qpid/dispatch/trunk/tests/system_test.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_test.py?rev=1602494&r1=1602493&r2=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_test.py (original)
+++ qpid/dispatch/trunk/tests/system_test.py Fri Jun 13 19:08:16 2014
@@ -56,6 +56,7 @@ import os, time, socket, random, subproc
 from copy import copy
 import proton
 from proton import Message
+from qpid_dispatch_internal.management import amqp
 
 # Optional modules
 MISSING_MODULES = []
@@ -296,37 +297,6 @@ class Qdrouterd(Process):
             self.defaults()
             return "".join(["%s {\n%s}\n"%(n, props(p)) for n, p in self])
 
-    class Agent(object):
-        """Management agent"""
-        def __init__(self, router):
-            self.router = router
-            self.messenger = Messenger()
-            self.messenger.route("amqp:/*", "amqp://0.0.0.0:%s/$1"%router.ports[0])
-            self.address = "amqp:/$management"
-            self.subscription = self.messenger.subscribe("amqp:/#")
-            self.reply_to = self.subscription.address
-
-        def stop(self):
-            """Stop the agent's messenger"""
-            self.messenger.stop()
-
-        def get(self, entity_type):
-            """Return a list of attribute dicts for each instance of entity_type"""
-            request = message(
-                address=self.address, reply_to=self.reply_to,
-                correlation_id=1,
-                properties={u'operation':u'QUERY', u'entityType':entity_type},
-                body={'attributeNames':[]})
-            self.messenger.put(request)
-            response = self.messenger.fetch()
-            if response.properties['statusCode'] != 200:
-                raise Exception("Agent error: %d %s" % (
-                    response.properties['statusCode'],
-                    response.properties['statusDescription']))
-            attrs = response.body['attributeNames']
-            return [dict(zip(attrs, values)) for values in response.body['results']]
-
-
     def __init__(self, name, config=Config(), wait=True):
         """
         @param name: name used for for output files.
@@ -337,13 +307,14 @@ class Qdrouterd(Process):
         super(Qdrouterd, self).__init__(
             name, ['qdrouterd', '-c', config.write(name)], expect=Process.RUNNING)
         self._agent = None
-        if wait: self.wait_ready()
+        if wait:
+            self.wait_ready()
 
     @property
     def agent(self):
         """Return an management Agent for this router"""
         if not self._agent:
-            self._agent = self.Agent(self)
+            self._agent = amqp.Node(self.addresses[0])
         return self._agent
 
     def teardown(self):
@@ -369,9 +340,9 @@ class Qdrouterd(Process):
     def is_connected(self, port, host='0.0.0.0'):
         """If router has a connection to host:port return the management info.
         Otherwise return None"""
-        connections = self.agent.get('org.apache.qpid.dispatch.connection')
+        connections = self.agent.query('org.apache.qpid.dispatch.connection')
         for c in connections:
-            if c['name'] == '%s:%s'%(host, port):
+            if c.name == '%s:%s'%(host, port):
                 return c
         return None
 
@@ -393,7 +364,7 @@ class Qpidd(Process):
         def __str__(self):
             return "".join(["%s=%s\n"%(k, v) for k, v in self.iteritems()])
 
-    def __init__(self, name, config=Config(), port=None):
+    def __init__(self, name, config=Config(), port=None, wait=True):
         self.config = Qpidd.Config(
             {'auth':'no',
              'log-to-stderr':'false', 'log-to-file':name+".log",
@@ -406,6 +377,8 @@ class Qpidd(Process):
         self.port = self.config['port'] or 5672
         self.address = "127.0.0.1:%s"%self.port
         self._agent = None
+        if wait:
+            self.wait_ready()
 
     def qm_connect(self):
         """Make a qpid_messaging connection to the broker"""
@@ -422,7 +395,8 @@ class Qpidd(Process):
             self._agent = qpidtoollibs.BrokerAgent(self.qm_connect(), **kwargs)
         return self._agent
 
-
+    def wait_ready(self):
+        wait_port(self.port)
 
 # Decorator to add an optional flush argument to a method, defaulting to
 # the _flush value for the messenger.

Modified: qpid/dispatch/trunk/tests/system_tests_broker.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_broker.py?rev=1602494&r1=1602493&r2=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_broker.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_broker.py Fri Jun 13 19:08:16 2014
@@ -32,10 +32,10 @@ class DistributedQueueTest(system_test.T
     def setUpClass(cls):
         """Start 3 qpidd brokers, wait for them to be ready."""
         super(DistributedQueueTest, cls).setUpClass()
-        cls.qpidds = [cls.tester.qpidd('qpidd%s'%i, port=cls.get_port())
+        cls.qpidds = [cls.tester.qpidd('qpidd%s'%i, port=cls.get_port(), wait=False)
                     for i in xrange(3)]
         for q in cls.qpidds:
-            wait_port(q.port)
+            q.wait_ready()
 
     @classmethod
     def tearDownClass(cls):

Modified: qpid/dispatch/trunk/tests/system_tests_management.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_management.py?rev=1602494&r1=1602493&r2=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_management.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_management.py Fri Jun 13 19:08:16 2014
@@ -68,9 +68,9 @@ class ManagementTest(system_test.TestCas
         address = 'org.apache.qpid.dispatch.router.address'
         response = self.node.query(entity_type=address)
         self.assertEqual(response.attribute_names[0:3], ['type', 'name', 'identity'])
-        for r in response.results:  # Check types
-            self.assertEqual(r[0], address)
-        names = [r[1] for r in response.results]
+        for r in response:  # Check types
+            self.assertEqual(r.type, address)
+        names = [r.name for r in response]
         self.assertTrue('L$management' in names)
         self.assertTrue('M0$management' in names)
 
@@ -79,9 +79,9 @@ class ManagementTest(system_test.TestCas
             # Try offset, count
             self.assertGreater(len(names), 2)
             response0 = self.node.query(entity_type=address, count=1)
-            self.assertEqual(names[0:1], [r[1] for r in response0.results])
+            self.assertEqual(names[0:1], [r[1] for r in response0])
             response1_2 = self.node.query(entity_type=address, count=2, offset=1)
-            self.assertEqual(names[1:3], [r[1] for r in response1_2.results])
+            self.assertEqual(names[1:3], [r[1] for r in response1_2])
             self.fail("Negative test passed!")
         except: pass
 
@@ -90,7 +90,7 @@ class ManagementTest(system_test.TestCas
         # FIXME aconway 2014-06-05: negative test: attribute_names query doesn't work.
         # Need a better test.
         try:
-            self.assertNotEqual([], response.results)
+            self.assertNotEqual([], response)
             self.fail("Negative test passed!")
         except: pass
 

Copied: qpid/dispatch/trunk/tools/qdstat (from r1602493, qpid/dispatch/trunk/tools/qdstat.in)
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tools/qdstat?p2=qpid/dispatch/trunk/tools/qdstat&p1=qpid/dispatch/trunk/tools/qdstat.in&r1=1602493&r2=1602494&rev=1602494&view=diff
==============================================================================
--- qpid/dispatch/trunk/tools/qdstat.in (original)
+++ qpid/dispatch/trunk/tools/qdstat Fri Jun 13 19:08:16 2014
@@ -26,8 +26,10 @@ import locale
 import socket
 import re
 from proton import Messenger, Message, Timeout
+from qpid_dispatch_internal.management.amqp import Url, Node
+from qpid_dispatch_internal.management.entity import Entity
 
-home = os.environ.get("QPID_DISPATCH_HOME", os.path.normpath("${QPID_DISPATCH_HOME_INSTALLED}"))
+home = os.environ.get("QPID_DISPATCH_HOME", os.path.normpath(os.path.dirname(__file__)))
 sys.path.append(os.path.join(home, "python"))
 
 from qpid_dispatch_internal.tools import Display, Header, Sorter, YN, Commas, TimeLong
@@ -35,7 +37,6 @@ from qpid_dispatch_internal.tools import
 
 class Config:
     def __init__(self):
-        self._host = "0.0.0.0"
         self._connTimeout = 5
         self._types = ""
         self._limit = 50
@@ -88,75 +89,15 @@ def OptionsAndArguments(argv):
         parser.error("You must specify one of these options: -g, -c, -l, -n, -a, or -m. For
details, try $ qdstat --help")
 
     config._types = opts.show
-    config._host = opts.bus
+    config._address = opts.bus
     config._router = opts.router
     config._connTimeout = opts.timeout
 
     return args
 
 
-class AmqpEntity(object):
-    def __init__(self, types, values):
-        if len(types) != len(values):
-            raise Exception("Mismatched types and values for entity")
-        self.values = {}
-        for idx in range(len(types)):
-            self.values[types[idx]] = values[idx]
-
-    def __getattr__(self, attr):
-        if attr in self.values:
-            return self.values[attr]
-        raise Exception("Unknown attribute: %s" % attr)
 
-    def __repr__(self):
-        return "%r" % self.values
-
-
-class BusManager:
-    def __init__(self):
-        pass
-
-    def SetHost(self, host, router):
-        self.M = Messenger()
-        self.M.start()
-        self.M.timeout = config._connTimeout
-        self.M.route("amqp:/*", "amqp://%s/$1" % host)
-        if router:
-            self.address = "amqp:/_topo/0/%s/$management" % router
-        else:
-            self.address = "amqp:/$management"
-        self.subscription = self.M.subscribe("amqp:/#")
-        self.reply = self.subscription.address
-
-    def Disconnect(self):
-        self.M.stop()
-
-    def _get_object(self, cls):
-        request = Message()
-        response = Message()
-
-        request.address = self.address
-        request.reply_to = self.reply
-        request.correlation_id = 1
-        request.properties = {u'operation':u'QUERY', u'entityType':cls}
-        request.body = {'attributeNames': []}
-
-        self.M.put(request)
-        self.M.send()
-        self.M.recv()
-        self.M.get(response)
-
-        if response.properties['statusCode'] != 200:
-            raise Exception("Agent reports: %d %s" % (response.properties['statusCode'],
response.properties['statusDescription']))
-
-        entities = []
-        anames = response.body['attributeNames']
-        results = response.body['results']
-        for e in results:
-            entities.append(AmqpEntity(anames, e))
-
-        return entities
-        
+class BusManager(Node):
 
     def displayConnections(self):
         disp = Display(prefix="  ")
@@ -170,7 +111,7 @@ class BusManager:
 
         rows = []
 
-        objects = self._get_object('org.apache.qpid.dispatch.connection')
+        objects = self.query('org.apache.qpid.dispatch.connection')
 
         for conn in objects:
             row = []
@@ -216,7 +157,7 @@ class BusManager:
         heads.append(Header("value"))
         rows = []
 
-        objects = self._get_object('org.apache.qpid.dispatch.router')
+        objects = self.query('org.apache.qpid.dispatch.router')
 
         router = objects[0]
         rows.append(('Mode',          router.mode))
@@ -243,7 +184,7 @@ class BusManager:
         heads.append(Header("msg-fifo"))
         rows = []
 
-        objects = self._get_object('org.apache.qpid.dispatch.router.link')
+        objects = self.query('org.apache.qpid.dispatch.router.link')
 
         for link in objects:
             row = []
@@ -275,8 +216,8 @@ class BusManager:
         heads.append(Header("valid-origins"))
         rows = []
 
-        objects  = self._get_object('org.apache.qpid.dispatch.router.node')
-        attached = self._get_object('org.apache.qpid.dispatch.router')[0]
+        objects  = self.query('org.apache.qpid.dispatch.router.node')
+        attached = self.query('org.apache.qpid.dispatch.router')[0]
 
         nodes = {}
         for node in objects:
@@ -325,7 +266,7 @@ class BusManager:
         heads.append(Header("from-proc", Header.COMMAS))
         rows = []
 
-        objects = self._get_object('org.apache.qpid.dispatch.router.address')
+        objects = self.query('org.apache.qpid.dispatch.router.address')
 
         for addr in objects:
             row = []
@@ -359,7 +300,7 @@ class BusManager:
         heads.append(Header("rebal-out", Header.COMMAS))
         rows = []
 
-        objects = self._get_object('org.apache.qpid.dispatch.allocator')
+        objects = self.query('org.apache.qpid.dispatch.allocator')
 
         for t in objects:
             row = []
@@ -392,12 +333,10 @@ class BusManager:
 def main(argv=None):
 
     args = OptionsAndArguments(argv)
-    bm   = BusManager()
-
     try:
-        bm.SetHost(config._host, config._router)
+        bm = BusManager(config._address, config._router)
         bm.display(args)
-        bm.Disconnect()
+        bm.stop()
         return 0
     except KeyboardInterrupt:
         print
@@ -409,7 +348,7 @@ def main(argv=None):
     except Exception,e:
         print "Failed: %s - %s" % (e.__class__.__name__, e)
 
-    bm.Disconnect()
+    bm.stop()
     return 1
 
 if __name__ == "__main__":



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message