Return-Path: Delivered-To: apmail-avro-commits-archive@www.apache.org Received: (qmail 79826 invoked from network); 26 Aug 2010 21:16:42 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 26 Aug 2010 21:16:42 -0000 Received: (qmail 39854 invoked by uid 500); 26 Aug 2010 21:16:42 -0000 Delivered-To: apmail-avro-commits-archive@avro.apache.org Received: (qmail 39813 invoked by uid 500); 26 Aug 2010 21:16:42 -0000 Mailing-List: contact commits-help@avro.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@avro.apache.org Delivered-To: mailing list commits@avro.apache.org Received: (qmail 39805 invoked by uid 99); 26 Aug 2010 21:16:42 -0000 Received: from Unknown (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Aug 2010 21:16:42 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Aug 2010 21:16:22 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 102E22388A1C; Thu, 26 Aug 2010 21:15:04 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r989927 - in /avro/trunk: CHANGES.txt lang/py/src/avro/ipc.py lang/py/src/avro/txipc.py lang/py/test/txsample_http_client.py lang/py/test/txsample_http_server.py Date: Thu, 26 Aug 2010 21:15:03 -0000 To: commits@avro.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100826211504.102E22388A1C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Thu Aug 26 21:15:02 2010 New Revision: 989927 URL: http://svn.apache.org/viewvc?rev=989927&view=rev Log: AVRO-528. Python: Add support for Twisted. Contributed by Esteve Fernandez. Added: avro/trunk/lang/py/src/avro/txipc.py avro/trunk/lang/py/test/txsample_http_client.py avro/trunk/lang/py/test/txsample_http_server.py Modified: avro/trunk/CHANGES.txt avro/trunk/lang/py/src/avro/ipc.py Modified: avro/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=989927&r1=989926&r2=989927&view=diff ============================================================================== --- avro/trunk/CHANGES.txt (original) +++ avro/trunk/CHANGES.txt Thu Aug 26 21:15:02 2010 @@ -60,6 +60,8 @@ Avro 1.4.0 (unreleased) AVRO-611. IDL: Add support for one-way messages. (cutting) + AVRO-528. Python: Add support for Twisted. (Esteve Fernandez via cutting) + IMPROVEMENTS AVRO-629. Prefer the JSON module of python's stdlib over simplejson. Modified: avro/trunk/lang/py/src/avro/ipc.py URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ipc.py?rev=989927&r1=989926&r2=989927&view=diff ============================================================================== --- avro/trunk/lang/py/src/avro/ipc.py (original) +++ avro/trunk/lang/py/src/avro/ipc.py Thu Aug 26 21:15:02 2010 @@ -75,7 +75,7 @@ class ConnectionClosedException(schema.A # Base IPC Classes (Requestor/Responder) # -class Requestor(object): +class BaseRequestor(object): """Base class for the client side of a protocol interaction.""" def __init__(self, local_protocol, transceiver): self._local_protocol = local_protocol @@ -116,15 +116,7 @@ class Requestor(object): # send the handshake and call request; block until call response call_request = buffer_writer.getvalue() - call_response = self.transceiver.transceive(call_request) - - # process the handshake and call response - buffer_decoder = io.BinaryDecoder(StringIO(call_response)) - call_response_exists = self.read_handshake_response(buffer_decoder) - if call_response_exists: - return self.read_call_response(message_name, buffer_decoder) - else: - return self.request(message_name, request_datum) + return self.issue_request(call_request, message_name, request_datum) def write_handshake_request(self, encoder): local_hash = self.local_protocol.md5 @@ -232,6 +224,19 @@ class Requestor(object): datum_reader = io.DatumReader(writers_schema, readers_schema) return AvroRemoteException(datum_reader.read(decoder)) +class Requestor(BaseRequestor): + + def issue_request(self, call_request, message_name, request_datum): + call_response = self.transceiver.transceive(call_request) + + # process the handshake and call response + buffer_decoder = io.BinaryDecoder(StringIO(call_response)) + call_response_exists = self.read_handshake_response(buffer_decoder) + if call_response_exists: + return self.read_call_response(message_name, buffer_decoder) + else: + return self.request(message_name, request_datum) + class Responder(object): """Base class for the server side of a protocol interaction.""" def __init__(self, local_protocol): Added: avro/trunk/lang/py/src/avro/txipc.py URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/txipc.py?rev=989927&view=auto ============================================================================== --- avro/trunk/lang/py/src/avro/txipc.py (added) +++ avro/trunk/lang/py/src/avro/txipc.py Thu Aug 26 21:15:02 2010 @@ -0,0 +1,222 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO +from avro import ipc +from avro import io + +from zope.interface import implements + +from twisted.web.client import Agent +from twisted.web.http_headers import Headers +from twisted.internet.defer import maybeDeferred, Deferred +from twisted.web.iweb import IBodyProducer +from twisted.web import resource, server +from twisted.internet.protocol import Protocol + +class TwistedRequestor(ipc.BaseRequestor): + """A Twisted-compatible requestor. Returns a Deferred that will fire with the + returning value, instead of blocking until the request completes.""" + def _process_handshake(self, call_response, message_name, request_datum): + # process the handshake and call response + buffer_decoder = io.BinaryDecoder(StringIO(call_response)) + call_response_exists = self.read_handshake_response(buffer_decoder) + if call_response_exists: + return self.read_call_response(message_name, buffer_decoder) + else: + return self.request(message_name, request_datum) + + def issue_request(self, call_request, message_name, request_datum): + d = self.transceiver.transceive(call_request) + d.addCallback(self._process_handshake, message_name, request_datum) + return d + +class RequestStreamingProducer(object): + """A streaming producer for issuing requests with the Twisted.web Agent.""" + implements(IBodyProducer) + + paused = False + stopped = False + started = False + + def __init__(self, message): + self._message = message + self._length = len(message) + # We need a buffer length header for every buffer and an additional + # zero-length buffer as the message terminator + self._length += (self._length / ipc.BUFFER_SIZE + 2) \ + * ipc.BUFFER_HEADER_LENGTH + self._total_bytes_sent = 0 + self._deferred = Deferred() + + # read-only properties + message = property(lambda self: self._message) + length = property(lambda self: self._length) + consumer = property(lambda self: self._consumer) + deferred = property(lambda self: self._deferred) + + def _get_total_bytes_sent(self): + return self._total_bytes_sent + + def _set_total_bytes_sent(self, bytes_sent): + self._total_bytes_sent = bytes_sent + + total_bytes_sent = property(_get_total_bytes_sent, _set_total_bytes_sent) + + def startProducing(self, consumer): + if self.started: + return + + self.started = True + self._consumer = consumer + # Keep writing data to the consumer until we're finished, + # paused (pauseProducing()) or stopped (stopProducing()) + while self.length - self.total_bytes_sent > 0 and \ + not self.paused and not self.stopped: + self.write() + # self.write will fire this deferred once it has written + # the entire message to the consumer + return self.deferred + + def resumeProducing(self): + self.paused = False + self.write(self) + + def pauseProducing(self): + self.paused = True + + def stopProducing(self): + self.stopped = True + + def write(self): + if self.length - self.total_bytes_sent > ipc.BUFFER_SIZE: + buffer_length = ipc.BUFFER_SIZE + else: + buffer_length = self.length - self.total_bytes_sent + self.write_buffer(self.message[self.total_bytes_sent: + (self.total_bytes_sent + buffer_length)]) + self.total_bytes_sent += buffer_length + # Make sure we wrote the entire message + if self.total_bytes_sent == self.length and not self.stopped: + self.stopProducing() + # A message is always terminated by a zero-length buffer. + self.write_buffer_length(0) + self.deferred.callback(None) + + def write_buffer(self, chunk): + buffer_length = len(chunk) + self.write_buffer_length(buffer_length) + self.consumer.write(chunk) + + def write_buffer_length(self, n): + self.consumer.write(ipc.BIG_ENDIAN_INT_STRUCT.pack(n)) + +class AvroProtocol(Protocol): + + recvd = '' + done = False + + def __init__(self, finished): + self.finished = finished + self.message = [] + + def dataReceived(self, data): + self.recvd = self.recvd + data + while len(self.recvd) >= ipc.BUFFER_HEADER_LENGTH: + buffer_length ,= ipc.BIG_ENDIAN_INT_STRUCT.unpack( + self.recvd[:ipc.BUFFER_HEADER_LENGTH]) + if buffer_length == 0: + response = ''.join(self.message) + self.done = True + self.finished.callback(response) + break + if len(self.recvd) < buffer_length + ipc.BUFFER_HEADER_LENGTH: + break + buffer = self.recvd[ipc.BUFFER_HEADER_LENGTH:buffer_length + ipc.BUFFER_HEADER_LENGTH] + self.recvd = self.recvd[buffer_length + ipc.BUFFER_HEADER_LENGTH:] + self.message.append(buffer) + + def connectionLost(self, reason): + if not self.done: + self.finished.errback(ipc.ConnectionClosedException("Reader read 0 bytes.")) + +class TwistedHTTPTransceiver(object): + """This transceiver uses the Agent class present in Twisted.web >= 9.0 + for issuing requests to the remote endpoint.""" + def __init__(self, host, port, remote_name=None, reactor=None): + self.url = "http://%s:%d/" % (host, port) + + if remote_name is None: + # There's no easy way to get this peer's remote address + # in Twisted so I use a random UUID to identify ourselves + import uuid + self.remote_name = uuid.uuid4() + + if reactor is None: + from twisted.internet import reactor + self.agent = Agent(reactor) + + def read_framed_message(self, response): + finished = Deferred() + response.deliverBody(AvroProtocol(finished)) + return finished + + def transceive(self, request): + req_method = 'POST' + req_headers = { + 'Content-Type': ['avro/binary'], + 'Accept-Encoding': ['identity'], + } + + body_producer = RequestStreamingProducer(request) + d = self.agent.request( + req_method, + self.url, + headers=Headers(req_headers), + bodyProducer=body_producer) + return d.addCallback(self.read_framed_message) + +class AvroResponderResource(resource.Resource): + """This Twisted.web resource can be placed anywhere in a URL hierarchy + to provide an Avro endpoint. Different Avro protocols can be served + by the same web server as long as they are in different resources in + a URL hierarchy.""" + isLeaf = True + + def __init__(self, responder): + resource.Resource.__init__(self) + self.responder = responder + + def cb_render_POST(self, resp_body, request): + request.setResponseCode(200) + request.setHeader('Content-Type', 'avro/binary') + resp_writer = ipc.FramedWriter(request) + resp_writer.write_framed_message(resp_body) + request.finish() + + def render_POST(self, request): + # Unfortunately, Twisted.web doesn't support incoming + # streamed input yet, the whole payload must be kept in-memory + request.content.seek(0, 0) + call_request_reader = ipc.FramedReader(request.content) + call_request = call_request_reader.read_framed_message() + d = maybeDeferred(self.responder.respond, call_request) + d.addCallback(self.cb_render_POST, request) + return server.NOT_DONE_YET Added: avro/trunk/lang/py/test/txsample_http_client.py URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/txsample_http_client.py?rev=989927&view=auto ============================================================================== --- avro/trunk/lang/py/test/txsample_http_client.py (added) +++ avro/trunk/lang/py/test/txsample_http_client.py Thu Aug 26 21:15:02 2010 @@ -0,0 +1,106 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import sys + +from twisted.internet import reactor, defer +from twisted.python.util import println + +from avro import protocol +from avro import txipc + +MAIL_PROTOCOL_JSON = """\ +{"namespace": "example.proto", + "protocol": "Mail", + + "types": [ + {"name": "Message", "type": "record", + "fields": [ + {"name": "to", "type": "string"}, + {"name": "from", "type": "string"}, + {"name": "body", "type": "string"} + ] + } + ], + + "messages": { + "send": { + "request": [{"name": "message", "type": "Message"}], + "response": "string" + }, + "replay": { + "request": [], + "response": "string" + } + } +} +""" +MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON) +SERVER_HOST = 'localhost' +SERVER_PORT = 9090 + +class UsageError(Exception): + def __init__(self, value): + self.value = value + def __str__(self): + return repr(self.value) + +def make_requestor(server_host, server_port, protocol): + client = txipc.TwistedHTTPTransceiver(SERVER_HOST, SERVER_PORT) + return txipc.TwistedRequestor(protocol, client) + +if __name__ == '__main__': + if len(sys.argv) not in [4, 5]: + raise UsageError("Usage: []") + + # client code - attach to the server and send a message + # fill in the Message record + message = dict() + message['to'] = sys.argv[1] + message['from'] = sys.argv[2] + message['body'] = sys.argv[3] + + try: + num_messages = int(sys.argv[4]) + except: + num_messages = 1 + + # build the parameters for the request + params = {} + params['message'] = message + + requests = [] + # send the requests and print the result + for msg_count in range(num_messages): + requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL) + d = requestor.request('send', params) + d.addCallback(lambda result: println("Result: " + result)) + requests.append(d) + results = defer.gatherResults(requests) + + def replay_cb(result): + print("Replay Result: " + result) + reactor.stop() + + def replay(_): + # try out a replay message + requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL) + d = requestor.request('replay', dict()) + d.addCallback(replay_cb) + + results.addCallback(replay) + reactor.run() Added: avro/trunk/lang/py/test/txsample_http_server.py URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/txsample_http_server.py?rev=989927&view=auto ============================================================================== --- avro/trunk/lang/py/test/txsample_http_server.py (added) +++ avro/trunk/lang/py/test/txsample_http_server.py Thu Aug 26 21:15:02 2010 @@ -0,0 +1,70 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from twisted.web import server +from twisted.internet import reactor + +from avro import ipc +from avro import protocol +from avro import txipc + +MAIL_PROTOCOL_JSON = """\ +{"namespace": "example.proto", + "protocol": "Mail", + + "types": [ + {"name": "Message", "type": "record", + "fields": [ + {"name": "to", "type": "string"}, + {"name": "from", "type": "string"}, + {"name": "body", "type": "string"} + ] + } + ], + + "messages": { + "send": { + "request": [{"name": "message", "type": "Message"}], + "response": "string" + }, + "replay": { + "request": [], + "response": "string" + } + } +} +""" +MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON) +SERVER_ADDRESS = ('localhost', 9090) + +class MailResponder(ipc.Responder): + def __init__(self): + ipc.Responder.__init__(self, MAIL_PROTOCOL) + + def invoke(self, message, request): + if message.name == 'send': + request_content = request['message'] + response = "Sent message to %(to)s from %(from)s with body %(body)s" % \ + request_content + return response + elif message.name == 'replay': + return 'replay' + +if __name__ == '__main__': + root = server.Site(txipc.AvroResponderResource(MailResponder())) + reactor.listenTCP(9090, root) + reactor.run()