qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1644684 [1/3] - in /qpid/dispatch/trunk: python/qpid_dispatch/management/ python/qpid_dispatch_internal/proton_future/ tests/ tools/
Date Thu, 11 Dec 2014 15:51:02 GMT
Author: aconway
Date: Thu Dec 11 15:51:01 2014
New Revision: 1644684

URL: http://svn.apache.org/r1644684
Log:
DISPATCH-82: Poor error handling by qdmanage and qdstat tools.

Fixed error handling for qd tools as follows:

- Forked future proton reactor work into qpid_dispatch.internal.proton_future
- Added proton_future.util.SyncRequestResponse to implement request-response pattern
- Updated qpid_dispatch.management.client to use SyncRequestResponse

When the reactor work has made it into a proton release, the proton_future package
should be removed.

Added:
    qpid/dispatch/trunk/python/qpid_dispatch_internal/proton_future/
    qpid/dispatch/trunk/python/qpid_dispatch_internal/proton_future/__init__.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/proton_future/handlers.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/proton_future/reactors.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/proton_future/utils.py
Modified:
    qpid/dispatch/trunk/python/qpid_dispatch/management/client.py
    qpid/dispatch/trunk/tests/system_test.py
    qpid/dispatch/trunk/tests/system_tests_management.py
    qpid/dispatch/trunk/tools/qdstat

Modified: qpid/dispatch/trunk/python/qpid_dispatch/management/client.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch/management/client.py?rev=1644684&r1=1644683&r2=1644684&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch/management/client.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch/management/client.py Thu Dec 11 15:51:01 2014
@@ -21,49 +21,12 @@
 AMQP management client for Qpid dispatch.
 """
 
+import qpid_dispatch_site
 import proton, threading
 from proton import Url
 from .error import *
 from .entity import EntityBase, clean_dict
-
-class MessengerImpl(object):
-    """
-    Messaging implementation for L{Node} based on proton.Messenger
-    @ivar reply_to: address for replies to the node.
-    """
-
-    def __init__(self, address, timeout=None):
-        self.messenger = proton.Messenger()
-        self.messenger.start()
-        self.messenger.timeout = timeout
-        subscribe_address = Url(address)
-        subscribe_address.path = "#"
-        self.subscription = self.messenger.subscribe(str(subscribe_address))
-        self._flush()
-        self.reply_to = self.subscription.address
-        if not self.reply_to:
-            raise ValueError("Failed to subscribe to %s"%subscribe_address)
-
-    def send(self, request):
-        """Send a message"""
-        self.messenger.put(request)
-        self.messenger.send()
-        self._flush()
-
-    def fetch(self):
-        """Wait for a single message."""
-        self.messenger.recv(1)
-        response = proton.Message()
-        self.messenger.get(response)
-        return response
-
-    def _flush(self):
-        """Call self.messenger.work() till there is no work left."""
-        while self.messenger.work(0.1): pass
-
-    def stop(self):
-        """Stop the messaging implementation"""
-        self.messenger.stop()
+from qpid_dispatch_internal.proton_future.utils import SyncRequestResponse, BlockingConnection
 
 
 class Entity(EntityBase):
@@ -109,50 +72,36 @@ class Entity(EntityBase):
 class Node(object):
     """Client proxy for an AMQP management node"""
 
-    def __init__(self, address=None, router=None, locales=None, timeout=10, message_impl=None):
+    def __init__(self, url=None, router=None, locales=None, timeout=10, connection=None):
         """
-        @param address: AMQP address of the management node.
+        @param url: URL of the management node.
         @param router: If address does not contain a path, use the management node for this
router ID.
             If not specified and address does not contain a path, use the default management
node.
         @param locales: Default list of locales for management operations.
+        @param client: a L{BlockingConnection}
         """
         self.name = self.identity = 'self'
         self.type = 'org.amqp.management' # AMQP management node type
-
-        self.address = Url(address).defaults()
+        self.url = Url(url).defaults()
         self.locales = locales
-        if self.address.path is None:
+        if self.url.path is None:
             if router:
-                self.address.path = '_topo/0/%s/$management' % router
+                self.url.path = '_topo/0/%s/$management' % router
             else:
-                self.address.path = '$management'
-        self.responses = {}
-        self.message_impl = message_impl or MessengerImpl(self.address, timeout=timeout)
-        self.reply_to = self.message_impl.reply_to
+                self.url.path = '$management'
+        connection=connection or BlockingConnection(url, timeout)
+        self.client = SyncRequestResponse(connection, self.url.path)
+        self.reply_to = self.client.reply_to
 
-    def stop(self):
+    def close(self):
         """Shut down the node"""
-        if not self.message_impl: return
-        self.message_impl.stop()
-        self.message_impl = None
-
-    def __del__(self):
-        if hasattr(self, 'message_impl'):
-            try: self.stop()
-            except: pass
+        if self.client:
+            self.client.close()
+            self.client = None
 
     def __repr__(self):
         return "%s(%s)"%(self.__class__.__name__, self.address)
 
-    CORRELATION_ID = 0
-    CORRELATION_LOCK = threading.Lock()
-
-    def correlation_id(self):
-        """Get the next correlation ID. Thread safe."""
-        with self.CORRELATION_LOCK:
-            Node.CORRELATION_ID += 1
-            return Node.CORRELATION_ID
-
     @staticmethod
     def check_response(response, expect=OK):
         """
@@ -176,9 +125,6 @@ class Node(object):
         """
         if self.locales: properties.setdefault('locales', self.locales)
         request = proton.Message()
-        request.address = str(self.address)
-        request.reply_to = self.reply_to
-        request.correlation_id = self.correlation_id()
         request.properties = clean_dict(properties)
         request.body = body or {}
         return request
@@ -192,15 +138,7 @@ class Node(object):
         Send a management request message, wait for a response.
         @return: Response message.
         """
-        if not request.address:
-            raise ValueError("Message must have an address")
-        if not request.reply_to:
-            raise ValueError("Message must have reply_to %s", request)
-        self.message_impl.send(request)
-        while True:
-            response = self.message_impl.fetch()
-            # Ignore mismatched correlation IDs, responses to earlier requests that timed
out.
-            if response.correlation_id == request.correlation_id: break
+        response = self.client.call(request)
         self.check_response(response, expect=expect)
         return response
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message