qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject qpid-proton git commit: PROTON-805: Add dispatch request-response extension to utils.py
Date Fri, 23 Jan 2015 20:51:37 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/master 71d75e953 -> 04d0d01b6


PROTON-805: Add dispatch request-response extension to utils.py

Added SyncRequestResponse class to utils.py.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/04d0d01b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/04d0d01b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/04d0d01b

Branch: refs/heads/master
Commit: 04d0d01b6216ca55c3d4274954d46d874f27a8f2
Parents: 71d75e9
Author: Alan Conway <aconway@redhat.com>
Authored: Fri Jan 23 15:33:08 2015 -0500
Committer: Alan Conway <aconway@redhat.com>
Committed: Fri Jan 23 15:33:08 2015 -0500

----------------------------------------------------------------------
 proton-c/bindings/python/proton/__init__.py |  4 +-
 proton-c/bindings/python/proton/handlers.py |  8 +--
 proton-c/bindings/python/proton/reactors.py | 12 ++--
 proton-c/bindings/python/proton/utils.py    | 78 ++++++++++++++++++++++--
 4 files changed, 86 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/04d0d01b/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 317a8b4..127ddd4 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -33,7 +33,7 @@ The proton APIs consist of the following classes:
 from cproton import *
 from wrapper import Wrapper
 
-import weakref, re, socket, sys
+import weakref, socket, sys
 try:
   import uuid
 except ImportError:
@@ -68,7 +68,7 @@ except ImportError:
       def __hash__(self):
         return self.bytes.__hash__()
 
-  import os, random, socket, time
+  import os, random, time
   rand = random.Random()
   rand.seed((os.getpid(), time.time(), socket.gethostname()))
   def random_uuid():

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/04d0d01b/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py
index b18bda6..e403d26 100644
--- a/proton-c/bindings/python/proton/handlers.py
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -243,7 +243,7 @@ class EndpointStateHandler(Handler):
     def print_error(cls, endpoint, endpoint_type):
         if endpoint.remote_condition:
             logging.error(endpoint.remote_condition.description)
-        elif self.is_local_open(endpoint) and self.is_remote_closed(endpoint):
+        elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint):
             logging.error("%s closed by peer" % endpoint_type)
 
     def on_link_remote_close(self, event):
@@ -334,20 +334,20 @@ class EndpointStateHandler(Handler):
         if self.delegate:
             dispatch(self.delegate, 'on_connection_error', event)
         else:
-            self.print_error(event.connection, "connection")
+            self.log_error(event.connection, "connection")
 
     def on_session_error(self, event):
         if self.delegate:
             dispatch(self.delegate, 'on_session_error', event)
         else:
-            self.print_error(event.session, "session")
+            self.log_error(event.session, "session")
             event.connection.close()
 
     def on_link_error(self, event):
         if self.delegate:
             dispatch(self.delegate, 'on_link_error', event)
         else:
-            self.print_error(event.link, "link")
+            self.log_error(event.link, "link")
             event.connection.close()
 
     def on_connection_closed(self, event):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/04d0d01b/proton-c/bindings/python/proton/reactors.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py
index 4e2664a..41e9543 100644
--- a/proton-c/bindings/python/proton/reactors.py
+++ b/proton-c/bindings/python/proton/reactors.py
@@ -408,9 +408,13 @@ class SelectLoop(object):
     def do_work(self, timeout=None):
         """@return True if some work was done, False if time-out expired"""
         tick = self.events.timer.tick()
-        while self.events.process():
-            if self._abort: return
+
+        if self.events.process():
             tick = self.events.timer.tick()
+            while self.events.process():
+                if self._abort: return
+                tick = self.events.timer.tick()
+            return True # Did work, let caller check their conditions, don't select.
 
         stable = False
         while not stable:
@@ -429,7 +433,7 @@ class SelectLoop(object):
             stable = len(closed) == 0
 
         if self.redundant:
-            return
+            return False
 
         if tick:
             timeout = _min(tick - time.time(), timeout)
@@ -836,7 +840,7 @@ class Container(object):
         return self.loop.do_work(timeout)
 
 import traceback
-from proton import WrappedHandler, _chandler, Connection, secs2millis, millis2secs, Selectable
+from proton import WrappedHandler, _chandler, secs2millis, millis2secs, Selectable
 from wrapper import Wrapper
 from cproton import *
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/04d0d01b/proton-c/bindings/python/proton/utils.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py
index f4740b5..7a35362 100644
--- a/proton-c/bindings/python/proton/utils.py
+++ b/proton-c/bindings/python/proton/utils.py
@@ -16,10 +16,10 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-import collections, Queue, socket, time
+import collections, Queue, socket, time, threading
 from proton import ConnectionException, Endpoint, Handler, Message, Timeout, Url
 from proton.reactors import AmqpSocket, Container, Events, SelectLoop, send_msg
-from proton.handlers import MessagingHandler, ScopedHandler
+from proton.handlers import MessagingHandler, ScopedHandler, IncomingMessageHandler
 
 class BlockingLink(object):
     def __init__(self, connection, link):
@@ -82,10 +82,7 @@ class BlockingConnection(Handler):
     def __init__(self, url, timeout=None, container=None):
         self.timeout = timeout
         self.container = container or Container()
-        if isinstance(url, basestring):
-            self.url = Url(url)
-        else:
-            self.url = url
+        self.url = Url(url).defaults()
         self.conn = self.container.connect(url=self.url, handler=self)
         self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
                   msg="Opening connection")
@@ -146,3 +143,72 @@ class BlockingConnection(Handler):
         else:
             txt += " by peer"
         raise ConnectionException(txt)
+
+
+def atomic_count(start=0, step=1):
+    """Thread-safe atomic count iterator"""
+    lock = threading.Lock()
+    count = start
+    while True:
+        with lock:
+            count += step;
+            yield count
+
+
+class SyncRequestResponse(IncomingMessageHandler):
+    """
+    Implementation of the synchronous request-responce (aka RPC) pattern.
+    """
+
+    correlation_id = atomic_count()
+
+    def __init__(self, connection, address=None):
+        """
+        Send requests and receive responses. A single instance can send many requests
+        to the same or different addresses.
+
+        @param connection: A L{BlockingConnection}
+        @param address: Address for all requests.
+            If not specified, each request must have the address property set.
+            Sucessive messages may have different addresses.
+
+        @ivar address: Address for all requests, may be None.
+        @ivar connection: Connection for requests and responses.
+        """
+        super(SyncRequestResponse, self).__init__()
+        self.connection = connection
+        self.address = address
+        self.sender = self.connection.create_sender(self.address)
+        # dynamic=true generates a unique address dynamically for this receiver.
+        # credit=1 because we want to receive 1 response message initially.
+        self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self)
+        self.response = None
+
+    def call(self, request):
+        """
+        Send a request message, wait for and return the response message.
+
+        @param request: A L{proton.Message}. If L{self.address} is not set the 
+            L{request.address} must be set and will be used.
+        """
+        if not self.address and not request.address:
+            raise ValueError("Request message has no address: %s" % request)
+        request.reply_to = self.reply_to
+        request.correlation_id = correlation_id = self.correlation_id.next()
+        self.sender.send_msg(request)
+        def wakeup():
+            return self.response and (self.response.correlation_id == correlation_id)
+        self.connection.wait(wakeup, msg="Waiting for response")
+        response = self.response
+        self.response = None    # Ready for next response.
+        self.receiver.flow(1)   # Set up credit for the next response.
+        return response
+
+    @property
+    def reply_to(self):
+        """Return the dynamic address of our receiver."""
+        return self.receiver.remote_source.address
+
+    def on_message(self, event):
+        """Called when we receive a message for our receiver."""
+        self.response = event.message


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message