Return-Path: Delivered-To: apmail-avro-dev-archive@www.apache.org Received: (qmail 36807 invoked from network); 19 Nov 2010 01:08:43 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 19 Nov 2010 01:08:43 -0000 Received: (qmail 5807 invoked by uid 500); 19 Nov 2010 01:09:15 -0000 Delivered-To: apmail-avro-dev-archive@avro.apache.org Received: (qmail 5713 invoked by uid 500); 19 Nov 2010 01:09:15 -0000 Mailing-List: contact dev-help@avro.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@avro.apache.org Delivered-To: mailing list dev@avro.apache.org Received: (qmail 5705 invoked by uid 99); 19 Nov 2010 01:09:15 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Nov 2010 01:09:15 +0000 X-ASF-Spam-Status: No, hits=0.7 required=10.0 tests=SPF_NEUTRAL X-Spam-Check-By: apache.org Received-SPF: neutral (athena.apache.org: local policy) Received: from [67.192.241.131] (HELO smtp131.dfw.emailsrvr.com) (67.192.241.131) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Nov 2010 01:09:10 +0000 Received: from localhost (localhost.localdomain [127.0.0.1]) by smtp23.relay.dfw1a.emailsrvr.com (SMTP Server) with ESMTP id 568912F81F8 for ; Thu, 18 Nov 2010 20:08:49 -0500 (EST) X-Virus-Scanned: OK Received: by smtp23.relay.dfw1a.emailsrvr.com (Authenticated sender: eevans-AT-racklabs.com) with ESMTPSA id 3162E2F81C8 for ; Thu, 18 Nov 2010 20:08:49 -0500 (EST) Subject: Python SocketTransceiver From: Eric Evans To: dev@avro.apache.org Content-Type: multipart/mixed; boundary="=-h0x1bJoKtDhleXxwV5D+" Date: Thu, 18 Nov 2010 19:10:42 -0600 Message-ID: <1290129042.25971.9.camel@erebus.lan> Mime-Version: 1.0 X-Mailer: Evolution 2.30.3 --=-h0x1bJoKtDhleXxwV5D+ Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: 7bit I put together a SocketTransceiver for Python so that I could compare performance against the HTTP one, but I'm having some trouble getting it to work. I'm suspect it's something really dumb, that I'm just too close to spot it, and that someone else might. The patch against 1.4 is attached. It causes the Java server to produce the following exception: java.io.EOFException at org.apache.avro.ipc.ByteBufferInputStream.getBuffer(ByteBufferInputStream.java:84) at org.apache.avro.ipc.ByteBufferInputStream.read(ByteBufferInputStream.java:46) at org.apache.avro.io.BinaryDecoder $InputStreamByteSource.readRaw(BinaryDecoder.java:815) at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:340) at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:265) at org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:99) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:318) at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:229) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:117) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:105) at org.apache.avro.ipc.Responder.respond(Responder.java:112) at org.apache.avro.ipc.SocketServer $Connection.run(SocketServer.java:91) at java.lang.Thread.run(Thread.java:636) Anyone? -- Eric Evans eevans@rackspace.com --=-h0x1bJoKtDhleXxwV5D+ Content-Disposition: attachment; filename="py_socket_transceiver.patch" Content-Type: text/x-patch; name="py_socket_transceiver.patch"; charset="UTF-8" Content-Transfer-Encoding: 7bit diff --git a/lang/py/src/avro/ipc.py b/lang/py/src/avro/ipc.py index f302d63..5b84326 100644 --- a/lang/py/src/avro/ipc.py +++ b/lang/py/src/avro/ipc.py @@ -16,7 +16,7 @@ """ Support for inter-process calls. """ -import httplib +import httplib, socket, struct try: from cStringIO import StringIO except ImportError: @@ -478,6 +478,72 @@ class HTTPTransceiver(object): def close(self): self.conn.close() +class SocketTransceiver(object): + """ + A simple socket-based transceiver implementation. + Useful for clients but not for servers + """ + def __init__(self, host, port, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): + self._sock = socket.create_connection((host, port), timeout) + + # read-only properties + sock = property(lambda self: self._sock) + remote_name = property(lambda self: self.sock.getsockname()) + + def transceive(self, request): + self.write_framed_message(request) + return self.read_framed_message() + + def read_framed_message(self): + msg = StringIO() + while True: + buff = StringIO() + size = self.__read_length() + if size == 0: + return msg.getvalue() + while buff.tell() < size: + chunk = self._sock.recv(size-buff.tell()) + if chunk == '': + raise ConnectionClosedException("socket read 0 bytes") + buff.write(chunk) + msg.write(buff.getvalue()) + + def write_framed_message(self, msg): + totalsize = len(msg) + totalsent = 0 + while (totalsize - totalsent) > 0: + if (totalsize - totalsent) > BUFFER_SIZE: + batchsize = BUFFER_SIZE + else: + batchsize = (totalsize - totalsent) + self.__write_buffer(msg[totalsent:(totalsent + batchsize)]) + totalsent += batchsize + self.__write_length(0) # null terminate + + def __write_buffer(self, msg): + size = len(msg) + self.__write_length(size) + totalsent = 0 + while totalsent < size: + sent = self._sock.send(msg[totalsent:]) + if sent == 0: + raise ConnectionClosedException("socket sent 0 bytes") + totalsent += sent + + def __write_length(self, length): + if self._sock.sendall(struct.pack('>i', length)) == 0: + raise ConnectionClosedException("socket sent 0 bytes") + + def __read_length(self): + length = self._sock.recv(4) + if length == '': + raise ConnectionClosedException("socket read 0 bytes") + return struct.unpack('>i', length)[0] + + def close(self): + self._sock.shutdown(socket.SHUT_RDWR) + self._sock.close() + # # Server Implementations (none yet) # --=-h0x1bJoKtDhleXxwV5D+--