qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1626029 - in /qpid/proton/branches/examples: proton-c/bindings/python/proton.py tutorial/proton_events.py tutorial/sync_client.py
Date Thu, 18 Sep 2014 17:11:12 GMT
Author: aconway
Date: Thu Sep 18 17:11:12 2014
New Revision: 1626029

URL: http://svn.apache.org/r1626029
Log:
NO-JIRA: Added tutorial/sync_client.py to demonstrate a synchronous request-response client.

This client uses the familiar paradigm of making blocking calls that send a
request and return the response.

Made some improvements to BlockingThread error handling and timeouts.

Added:
    qpid/proton/branches/examples/tutorial/sync_client.py   (with props)
Modified:
    qpid/proton/branches/examples/proton-c/bindings/python/proton.py
    qpid/proton/branches/examples/tutorial/proton_events.py

Modified: qpid/proton/branches/examples/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/bindings/python/proton.py?rev=1626029&r1=1626028&r2=1626029&view=diff
==============================================================================
--- qpid/proton/branches/examples/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/branches/examples/proton-c/bindings/python/proton.py Thu Sep 18 17:11:12 2014
@@ -31,7 +31,7 @@ The proton APIs consist of the following
 """
 
 from cproton import *
-import weakref
+import weakref, re, socket
 try:
   import uuid
 except ImportError:
@@ -2215,6 +2215,9 @@ class Condition:
                                         (self.name, self.description, self.info)
                                         if x])
 
+  def __str__(self):
+    return ": ".join(filter(None, [self.name, self.description, self.info]))
+
   def __eq__(self, o):
     if not isinstance(o, Condition): return False
     return self.name == o.name and \

Modified: qpid/proton/branches/examples/tutorial/proton_events.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1626029&r1=1626028&r2=1626029&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_events.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_events.py Thu Sep 18 17:11:12 2014
@@ -17,8 +17,8 @@
 # under the License.
 #
 import heapq, os, Queue, re, socket, time, types
-from proton import Collector, Connection, Delivery, Endpoint, Event
-from proton import Message, ProtonException, Transport, TransportException
+from proton import Collector, Connection, Delivery, Endpoint, Event, Timeout
+from proton import Message, ProtonException, Transport, TransportException, ConnectionException
 from select import select
 
 class EventDispatcher(object):
@@ -81,7 +81,10 @@ class Selectable(object):
         if username and password:
             sasl = self.transport.sasl()
             sasl.plain(username, password)
-        self.socket.connect_ex((host, port or 5672))
+        try:
+            self.socket.connect_ex((host, port or 5672))
+        except socket.gaierror, e:
+            raise ConnectionException("Cannot resolve '%s': %s" % (host, e))
         return self
 
     def _closed_cleanly(self):
@@ -358,11 +361,16 @@ class SelectLoop(object):
     def redundant(self):
         return self.events.empty and not self.selectables
 
+    @property
+    def aborted(self):
+        return self._abort
+
     def run(self):
         while not (self._abort or self.redundant):
             self.do_work()
 
-    def do_work(self, timeout = 3):
+    def do_work(self, timeout=None):
+        """@return True if some work was done, False if time-out expired"""
         self.events.process()
         if self._abort: return
 
@@ -384,7 +392,9 @@ class SelectLoop(object):
         if self.redundant:
             return
 
-        if self.events.next_interval and self.events.next_interval < timeout:
+        if timeout and timeout < 0:
+            timeout = 0
+        if self.events.next_interval and (timeout is None or self.events.next_interval <
timeout):
             timeout = self.events.next_interval
         readable, writable, _ = select(reading, writing, [], timeout)
 
@@ -393,6 +403,9 @@ class SelectLoop(object):
         for s in writable:
             s.writable()
 
+        return bool(readable or writable)
+
+
 class Handshaker(EventDispatcher):
 
     def on_connection_remote_open(self, event):
@@ -678,7 +691,7 @@ class Url(object):
     AMQP = "amqp"
 
     def __init__(self, value):
-        match = Url.RE.match(value)
+        match = Url.RE.match(str(value))
         if match is None:
             raise ValueError(value)
         self.scheme, self.user, self.password, host4, host6, port = match.groups()
@@ -787,17 +800,23 @@ class EventLoop(object):
     def stop(self):
         self.loop.abort()
 
+    def do_work(self, timeout=None):
+        return self.loop.do_work(timeout)
+
 
 class BlockingLink(object):
     def __init__(self, connection, link):
         self.connection = connection
         self.link = link
-        while self.link.state & Endpoint.REMOTE_UNINIT:
-            self.connection.loop.do_work()
+        self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT),
+                             msg="Opening link %s" % link.name)
 
     def close(self):
-        while self.link.state & Endpoint.REMOTE_ACTIVE:
-            self.connection.loop.do_work()
+        self.connection.wait(not (self.link.state & Endpoint.REMOTE_ACTIVE),
+                             msg="Closing link %s" % link.name)
+
+    # Access to other link attributes.
+    def __getattr__(self, name): return getattr(self.link, name)
 
 class BlockingSender(BlockingLink):
     def __init__(self, connection, sender):
@@ -805,39 +824,52 @@ class BlockingSender(BlockingLink):
 
     def send_msg(self, msg):
         delivery = send_msg(self.link, msg)
-        while not delivery.settled:
-            self.connection.loop.do_work()
+        self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name)
 
 class BlockingReceiver(BlockingLink):
-    def __init__(self, connection, receiver):
+    def __init__(self, connection, receiver, credit=1):
         super(BlockingReceiver, self).__init__(connection, receiver)
-        receiver.flow(1)
+        if credit: receiver.flow(credit)
 
 class BlockingConnection(EventDispatcher):
-    def __init__(self, url):
+    def __init__(self, url, timeout=None):
+        self.timeout = timeout
         self.events = Events(ScopedDispatcher())
         self.loop = SelectLoop(self.events)
         self.context = MessagingContext(self.loop.events.connection(), handler=self)
-        host, port = url.split(":")
-        if port: port = int(port)
-        self.loop.add(Selectable(self.context.conn, socket.socket(), self.events).connect(host,
port))
+        self.url = url
+        self.loop.add(
+            Selectable(self.context.conn, socket.socket(), self.events).connect(self.url.host,
self.url.port))
         self.context.conn.open()
-        while self.context.conn.state & Endpoint.REMOTE_UNINIT:
-            self.loop.do_work()
+        self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_UNINIT),
+                  msg="Opening connection")
 
     def sender(self, address, handler=None):
         return BlockingSender(self, self.context.sender(address, handler=handler))
 
-    def receiver(self, address, handler=None):
-        return BlockingReceiver(self, self.context.receiver(address, handler=handler))
+    def receiver(self, address, credit=1, dynamic=False, handler=None):
+        return BlockingReceiver(
+            self, self.context.receiver(address, dynamic=dynamic, handler=handler), credit=credit)
 
     def close(self):
         self.context.conn.close()
-        while self.context.conn.state & Endpoint.REMOTE_ACTIVE:
-            self.loop.do_work()
+        self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_ACTIVE),
+                  msg="Closing connection")
 
-    def run(self):
-        self.loop.run()
+    def wait(self, condition, timeout=False, msg=None):
+        """Call do_work until condition() is true"""
+        if timeout is False:
+            timeout = self.timeout
+        if timeout is None:
+            while not condition():
+                self.loop.do_work()
+        else:
+            deadline = time.time() + timeout
+            while not condition():
+                if not self.loop.do_work(deadline - time.time()):
+                    txt = "Connection %s timed out" % self.url
+                    if msg: txt += ": " + msg
+                    raise Timeout(txt)
 
     def on_link_remote_close(self, event):
         if event.link.state & Endpoint.LOCAL_ACTIVE:
@@ -848,11 +880,12 @@ class BlockingConnection(EventDispatcher
             self.closed(event.connection.remote_condition)
 
     def on_disconnected(self, event):
-        raise Exception("Disconnected");
+        raise ConnectionException("Connection %s disconnected" % self.url);
 
     def closed(self, error=None):
+        txt = "Connection %s closed" % self.url
         if error:
-            txt = "Closed due to %s" % error
+            txt += " due to: %s" % error
         else:
-            txt = "Closed by peer"
-        raise Exception(txt)
+            txt += " by peer"
+        raise ConnectionException(txt)

Added: qpid/proton/branches/examples/tutorial/sync_client.py
URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/sync_client.py?rev=1626029&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/sync_client.py (added)
+++ qpid/proton/branches/examples/tutorial/sync_client.py Thu Sep 18 17:11:12 2014
@@ -0,0 +1,86 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Demonstrates the client side of the synchronous request-response pattern
+(also known as RPC or Remote Procecure Call) using proton.
+
+"""
+
+from proton import Message, Url, ConnectionException, Timeout
+from proton_events import BlockingConnection, IncomingMessageHandler
+import sys
+
+class SyncRequestClient(IncomingMessageHandler):
+    """
+    Implementation of the synchronous request-responce (aka RPC) pattern.
+    Create an instance and call invoke() to send a request and wait for a response.
+    """
+
+    def __init__(self, url, timeout=None):
+        """
+        @param url: a proton.Url or a URL string of the form 'host:port/path'
+            host:port is used to connect, path is used to identify the remote messaging endpoint.
+        """
+        self.connection = BlockingConnection(Url(url).defaults(), timeout=timeout)
+        self.sender = self.connection.sender(url.path)
+        # dynamic=true generates a unique address dynamically for this receiver.
+        # credit=1 because we want to receive 1 response message initially.
+        self.receiver = self.connection.receiver(None, dynamic=True, credit=1, handler=self)
+        self.response = None
+
+    def invoke(self, request):
+        """Send a request, wait for and return the response"""
+        request.reply_to = self.reply_to
+        self.sender.send_msg(request)
+        self.connection.wait(lambda: self.response, msg="Waiting for response")
+        response = self.response
+        self.response = None    # Ready for next response.
+        self.receiver.flow(1)   # Set up credit for the next response.
+        return response
+
+    @property
+    def reply_to(self):
+        """Return the dynamic address of our receiver."""
+        return self.receiver.remote_source.address
+
+    def on_message(self, event):
+        """Called when we receive a message for our receiver."""
+        self.response = event.message # Store the response
+
+    def close(self):
+        self.connection.close()
+
+
+if __name__ == '__main__':
+    url = Url("0.0.0.0/examples")
+    if len(sys.argv) > 1: url = Url(sys.argv[1])
+
+    invoker = SyncRequestClient(url, timeout=2)
+    try:
+        REQUESTS= ["Twas brillig, and the slithy toves",
+                   "Did gire and gymble in the wabe.",
+                   "All mimsy were the borogroves,",
+                   "And the mome raths outgrabe."]
+        for request in REQUESTS:
+            response = invoker.invoke(Message(body=request))
+            print "%s => %s" % (request, response.body)
+    finally:
+        invoker.close()

Propchange: qpid/proton/branches/examples/tutorial/sync_client.py
------------------------------------------------------------------------------
    svn:executable = *



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message