Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ECAE111085 for ; Thu, 18 Sep 2014 17:11:34 +0000 (UTC) Received: (qmail 8511 invoked by uid 500); 18 Sep 2014 17:11:34 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 8487 invoked by uid 500); 18 Sep 2014 17:11:34 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 8469 invoked by uid 99); 18 Sep 2014 17:11:34 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Sep 2014 17:11:34 +0000 X-ASF-Spam-Status: No, hits=-1999.7 required=5.0 tests=ALL_TRUSTED,URIBL_RHS_DOB X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Sep 2014 17:11:32 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A2ED42388C22; Thu, 18 Sep 2014 17:11:12 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1626029 - in /qpid/proton/branches/examples: proton-c/bindings/python/proton.py tutorial/proton_events.py tutorial/sync_client.py Date: Thu, 18 Sep 2014 17:11:12 -0000 To: commits@qpid.apache.org From: aconway@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140918171112.A2ED42388C22@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: aconway Date: Thu Sep 18 17:11:12 2014 New Revision: 1626029 URL: http://svn.apache.org/r1626029 Log: NO-JIRA: Added tutorial/sync_client.py to demonstrate a synchronous request-response client. This client uses the familiar paradigm of making blocking calls that send a request and return the response. Made some improvements to BlockingThread error handling and timeouts. Added: qpid/proton/branches/examples/tutorial/sync_client.py (with props) Modified: qpid/proton/branches/examples/proton-c/bindings/python/proton.py qpid/proton/branches/examples/tutorial/proton_events.py Modified: qpid/proton/branches/examples/proton-c/bindings/python/proton.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/bindings/python/proton.py?rev=1626029&r1=1626028&r2=1626029&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/bindings/python/proton.py (original) +++ qpid/proton/branches/examples/proton-c/bindings/python/proton.py Thu Sep 18 17:11:12 2014 @@ -31,7 +31,7 @@ The proton APIs consist of the following """ from cproton import * -import weakref +import weakref, re, socket try: import uuid except ImportError: @@ -2215,6 +2215,9 @@ class Condition: (self.name, self.description, self.info) if x]) + def __str__(self): + return ": ".join(filter(None, [self.name, self.description, self.info])) + def __eq__(self, o): if not isinstance(o, Condition): return False return self.name == o.name and \ Modified: qpid/proton/branches/examples/tutorial/proton_events.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1626029&r1=1626028&r2=1626029&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/proton_events.py (original) +++ qpid/proton/branches/examples/tutorial/proton_events.py Thu Sep 18 17:11:12 2014 @@ -17,8 +17,8 @@ # under the License. # import heapq, os, Queue, re, socket, time, types -from proton import Collector, Connection, Delivery, Endpoint, Event -from proton import Message, ProtonException, Transport, TransportException +from proton import Collector, Connection, Delivery, Endpoint, Event, Timeout +from proton import Message, ProtonException, Transport, TransportException, ConnectionException from select import select class EventDispatcher(object): @@ -81,7 +81,10 @@ class Selectable(object): if username and password: sasl = self.transport.sasl() sasl.plain(username, password) - self.socket.connect_ex((host, port or 5672)) + try: + self.socket.connect_ex((host, port or 5672)) + except socket.gaierror, e: + raise ConnectionException("Cannot resolve '%s': %s" % (host, e)) return self def _closed_cleanly(self): @@ -358,11 +361,16 @@ class SelectLoop(object): def redundant(self): return self.events.empty and not self.selectables + @property + def aborted(self): + return self._abort + def run(self): while not (self._abort or self.redundant): self.do_work() - def do_work(self, timeout = 3): + def do_work(self, timeout=None): + """@return True if some work was done, False if time-out expired""" self.events.process() if self._abort: return @@ -384,7 +392,9 @@ class SelectLoop(object): if self.redundant: return - if self.events.next_interval and self.events.next_interval < timeout: + if timeout and timeout < 0: + timeout = 0 + if self.events.next_interval and (timeout is None or self.events.next_interval < timeout): timeout = self.events.next_interval readable, writable, _ = select(reading, writing, [], timeout) @@ -393,6 +403,9 @@ class SelectLoop(object): for s in writable: s.writable() + return bool(readable or writable) + + class Handshaker(EventDispatcher): def on_connection_remote_open(self, event): @@ -678,7 +691,7 @@ class Url(object): AMQP = "amqp" def __init__(self, value): - match = Url.RE.match(value) + match = Url.RE.match(str(value)) if match is None: raise ValueError(value) self.scheme, self.user, self.password, host4, host6, port = match.groups() @@ -787,17 +800,23 @@ class EventLoop(object): def stop(self): self.loop.abort() + def do_work(self, timeout=None): + return self.loop.do_work(timeout) + class BlockingLink(object): def __init__(self, connection, link): self.connection = connection self.link = link - while self.link.state & Endpoint.REMOTE_UNINIT: - self.connection.loop.do_work() + self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT), + msg="Opening link %s" % link.name) def close(self): - while self.link.state & Endpoint.REMOTE_ACTIVE: - self.connection.loop.do_work() + self.connection.wait(not (self.link.state & Endpoint.REMOTE_ACTIVE), + msg="Closing link %s" % link.name) + + # Access to other link attributes. + def __getattr__(self, name): return getattr(self.link, name) class BlockingSender(BlockingLink): def __init__(self, connection, sender): @@ -805,39 +824,52 @@ class BlockingSender(BlockingLink): def send_msg(self, msg): delivery = send_msg(self.link, msg) - while not delivery.settled: - self.connection.loop.do_work() + self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name) class BlockingReceiver(BlockingLink): - def __init__(self, connection, receiver): + def __init__(self, connection, receiver, credit=1): super(BlockingReceiver, self).__init__(connection, receiver) - receiver.flow(1) + if credit: receiver.flow(credit) class BlockingConnection(EventDispatcher): - def __init__(self, url): + def __init__(self, url, timeout=None): + self.timeout = timeout self.events = Events(ScopedDispatcher()) self.loop = SelectLoop(self.events) self.context = MessagingContext(self.loop.events.connection(), handler=self) - host, port = url.split(":") - if port: port = int(port) - self.loop.add(Selectable(self.context.conn, socket.socket(), self.events).connect(host, port)) + self.url = url + self.loop.add( + Selectable(self.context.conn, socket.socket(), self.events).connect(self.url.host, self.url.port)) self.context.conn.open() - while self.context.conn.state & Endpoint.REMOTE_UNINIT: - self.loop.do_work() + self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_UNINIT), + msg="Opening connection") def sender(self, address, handler=None): return BlockingSender(self, self.context.sender(address, handler=handler)) - def receiver(self, address, handler=None): - return BlockingReceiver(self, self.context.receiver(address, handler=handler)) + def receiver(self, address, credit=1, dynamic=False, handler=None): + return BlockingReceiver( + self, self.context.receiver(address, dynamic=dynamic, handler=handler), credit=credit) def close(self): self.context.conn.close() - while self.context.conn.state & Endpoint.REMOTE_ACTIVE: - self.loop.do_work() + self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_ACTIVE), + msg="Closing connection") - def run(self): - self.loop.run() + def wait(self, condition, timeout=False, msg=None): + """Call do_work until condition() is true""" + if timeout is False: + timeout = self.timeout + if timeout is None: + while not condition(): + self.loop.do_work() + else: + deadline = time.time() + timeout + while not condition(): + if not self.loop.do_work(deadline - time.time()): + txt = "Connection %s timed out" % self.url + if msg: txt += ": " + msg + raise Timeout(txt) def on_link_remote_close(self, event): if event.link.state & Endpoint.LOCAL_ACTIVE: @@ -848,11 +880,12 @@ class BlockingConnection(EventDispatcher self.closed(event.connection.remote_condition) def on_disconnected(self, event): - raise Exception("Disconnected"); + raise ConnectionException("Connection %s disconnected" % self.url); def closed(self, error=None): + txt = "Connection %s closed" % self.url if error: - txt = "Closed due to %s" % error + txt += " due to: %s" % error else: - txt = "Closed by peer" - raise Exception(txt) + txt += " by peer" + raise ConnectionException(txt) Added: qpid/proton/branches/examples/tutorial/sync_client.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/sync_client.py?rev=1626029&view=auto ============================================================================== --- qpid/proton/branches/examples/tutorial/sync_client.py (added) +++ qpid/proton/branches/examples/tutorial/sync_client.py Thu Sep 18 17:11:12 2014 @@ -0,0 +1,86 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Demonstrates the client side of the synchronous request-response pattern +(also known as RPC or Remote Procecure Call) using proton. + +""" + +from proton import Message, Url, ConnectionException, Timeout +from proton_events import BlockingConnection, IncomingMessageHandler +import sys + +class SyncRequestClient(IncomingMessageHandler): + """ + Implementation of the synchronous request-responce (aka RPC) pattern. + Create an instance and call invoke() to send a request and wait for a response. + """ + + def __init__(self, url, timeout=None): + """ + @param url: a proton.Url or a URL string of the form 'host:port/path' + host:port is used to connect, path is used to identify the remote messaging endpoint. + """ + self.connection = BlockingConnection(Url(url).defaults(), timeout=timeout) + self.sender = self.connection.sender(url.path) + # dynamic=true generates a unique address dynamically for this receiver. + # credit=1 because we want to receive 1 response message initially. + self.receiver = self.connection.receiver(None, dynamic=True, credit=1, handler=self) + self.response = None + + def invoke(self, request): + """Send a request, wait for and return the response""" + request.reply_to = self.reply_to + self.sender.send_msg(request) + self.connection.wait(lambda: self.response, msg="Waiting for response") + response = self.response + self.response = None # Ready for next response. + self.receiver.flow(1) # Set up credit for the next response. + return response + + @property + def reply_to(self): + """Return the dynamic address of our receiver.""" + return self.receiver.remote_source.address + + def on_message(self, event): + """Called when we receive a message for our receiver.""" + self.response = event.message # Store the response + + def close(self): + self.connection.close() + + +if __name__ == '__main__': + url = Url("0.0.0.0/examples") + if len(sys.argv) > 1: url = Url(sys.argv[1]) + + invoker = SyncRequestClient(url, timeout=2) + try: + REQUESTS= ["Twas brillig, and the slithy toves", + "Did gire and gymble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe."] + for request in REQUESTS: + response = invoker.invoke(Message(body=request)) + print "%s => %s" % (request, response.body) + finally: + invoker.close() Propchange: qpid/proton/branches/examples/tutorial/sync_client.py ------------------------------------------------------------------------------ svn:executable = * --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org