avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ham...@apache.org
Subject svn commit: r906394 - in /hadoop/avro/trunk: CHANGES.txt lang/py/src/avro/ipc.py lang/py/test/sample_http_client.py lang/py/test/sample_http_server.py lang/py/test/sample_ipc_client.py lang/py/test/sample_ipc_server.py
Date Thu, 04 Feb 2010 08:15:02 GMT
Author: hammer
Date: Thu Feb  4 08:15:02 2010
New Revision: 906394

URL: http://svn.apache.org/viewvc?rev=906394&view=rev
Log:
AVRO-322: Add a working client and server to Python implementation
using HTTP as a transport (hammer)


Added:
    hadoop/avro/trunk/lang/py/test/sample_http_client.py
    hadoop/avro/trunk/lang/py/test/sample_http_server.py
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/lang/py/src/avro/ipc.py
    hadoop/avro/trunk/lang/py/test/sample_ipc_client.py
    hadoop/avro/trunk/lang/py/test/sample_ipc_server.py

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=906394&r1=906393&r2=906394&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Feb  4 08:15:02 2010
@@ -87,6 +87,9 @@
     AVRO-380. Avro Container File format change: add block size to block
     descriptor.  (Scott Carey via philz)
 
+    AVRO-322. Add a working client and server to Python implementation 
+    using HTTP as a transport (hammer)
+
   IMPROVEMENTS
 
     AVRO-157. Changes from code review comments for C++. (sbanacho)

Modified: hadoop/avro/trunk/lang/py/src/avro/ipc.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/src/avro/ipc.py?rev=906394&r1=906393&r2=906394&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/py/src/avro/ipc.py (original)
+++ hadoop/avro/trunk/lang/py/src/avro/ipc.py Thu Feb  4 08:15:02 2010
@@ -18,7 +18,6 @@
 """
 import cStringIO
 import struct
-import socket
 from avro import io
 from avro import protocol
 from avro import schema
@@ -96,26 +95,26 @@
 
 class Requestor(object):
   """Base class for the client side of a protocol interaction."""
-  def __init__(self, local_protocol, transport):
+  def __init__(self, local_protocol, transceiver):
     self._local_protocol = local_protocol
-    self._transport = transport
+    self._transceiver = transceiver
     self._remote_protocol = None
     self._remote_hash = None
     self._send_protocol = None
 
   # read-only properties
   local_protocol = property(lambda self: self._local_protocol)
-  transport = property(lambda self: self._transport)
+  transceiver = property(lambda self: self._transceiver)
 
   # read/write properties
   def set_remote_protocol(self, new_remote_protocol):
     self._remote_protocol = new_remote_protocol
-    REMOTE_PROTOCOLS[self.transport.remote_name] = self.remote_protocol
+    REMOTE_PROTOCOLS[self.transceiver.remote_name] = self.remote_protocol
   remote_protocol = property(lambda self: self._remote_protocol,
                              set_remote_protocol)
   def set_remote_hash(self, new_remote_hash):
     self._remote_hash = new_remote_hash
-    REMOTE_HASHES[self.transport.remote_name] = self.remote_hash
+    REMOTE_HASHES[self.transceiver.remote_name] = self.remote_hash
   remote_hash = property(lambda self: self._remote_hash, set_remote_hash)
   def set_send_protocol(self, new_send_protocol):
     self._send_protocol = new_send_protocol
@@ -131,9 +130,9 @@
     self.write_handshake_request(buffer_encoder)
     self.write_call_request(message_name, request_datum, buffer_encoder)
 
-    # send the handshake and call request;  block until call response
+    # send the handshake and call request; block until call response
     call_request = buffer_writer.getvalue()
-    call_response = self.transport.transceive(call_request)
+    call_response = self.transceiver.transceive(call_request)
 
     # process the handshake and call response
     buffer_decoder = io.BinaryDecoder(cStringIO.StringIO(call_response))
@@ -145,7 +144,7 @@
 
   def write_handshake_request(self, encoder):
     local_hash = self.local_protocol.md5
-    remote_name = self.transport.remote_name
+    remote_name = self.transceiver.remote_name
     remote_hash = REMOTE_HASHES.get(remote_name)
     if remote_hash is None:
       remote_hash = local_hash
@@ -265,12 +264,11 @@
   def set_protocol_cache(self, hash, protocol):
     self.protocol_cache[hash] = protocol
 
-  def respond(self, transport):
+  def respond(self, call_request):
     """
     Called by a server to deserialize a request, compute and serialize
     a response or error. Compare to 'handle()' in Thrift.
     """
-    call_request = transport.read_framed_message()
     buffer_reader = cStringIO.StringIO(call_request)
     buffer_decoder = io.BinaryDecoder(buffer_reader)
     buffer_writer = cStringIO.StringIO()
@@ -279,8 +277,7 @@
     response_metadata = {}
     
     try:
-      remote_protocol = self.process_handshake(transport, buffer_decoder,
-                                               buffer_encoder)
+      remote_protocol = self.process_handshake(buffer_decoder, buffer_encoder)
       # handshake failure
       if remote_protocol is None:  
         return buffer_writer.getvalue()
@@ -329,7 +326,7 @@
       self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder)
     return buffer_writer.getvalue()
 
-  def process_handshake(self, transport, decoder, encoder):
+  def process_handshake(self, decoder, encoder):
     handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder)
     handshake_response = {}
 
@@ -380,36 +377,45 @@
     datum_writer.write(str(error_exception), encoder)
 
 #
-# Transport Implementations
+# Utility classes
 #
 
-class SocketTransport(object):
-  """A simple socket-based Transport implementation."""
-  def __init__(self, sock):
-    self._sock = sock
+class FramedReader(object):
+  """Wrapper around a file-like object to read framed data."""
+  def __init__(self, reader):
+    self._reader = reader
 
   # read-only properties
-  sock = property(lambda self: self._sock)
-  remote_name = property(lambda self: self.sock.getsockname())
-
-  def transceive(self, request):
-    self.write_framed_message(request)
-    return self.read_framed_message()
+  reader = property(lambda self: self._reader)
 
   def read_framed_message(self):
     message = []
     while True:
       buffer = cStringIO.StringIO()
-      buffer_length = self.read_buffer_length()
+      buffer_length = self._read_buffer_length()
       if buffer_length == 0:
         return ''.join(message)
       while buffer.tell() < buffer_length:
-        chunk = self.sock.recv(buffer_length - buffer.tell())
+        chunk = self.reader.read(buffer_length - buffer.tell())
         if chunk == '':
-          raise ConnectionClosedException("Socket read 0 bytes.")
+          raise ConnectionClosedException("Reader read 0 bytes.")
         buffer.write(chunk)
       message.append(buffer.getvalue())
 
+  def _read_buffer_length(self):
+    read = self.reader.read(BUFFER_HEADER_LENGTH)
+    if read == '':
+      raise ConnectionClosedException("Reader read 0 bytes.")
+    return BIG_ENDIAN_INT_STRUCT.unpack(read)[0]
+
+class FramedWriter(object):
+  """Wrapper around a file-like object to write framed data."""
+  def __init__(self, writer):
+    self._writer = writer
+
+  # read-only properties
+  writer = property(lambda self: self._writer)
+
   def write_framed_message(self, message):
     message_length = len(message)
     total_bytes_sent = 0
@@ -427,26 +433,49 @@
   def write_buffer(self, chunk):
     buffer_length = len(chunk)
     self.write_buffer_length(buffer_length)
-    total_bytes_sent = 0
-    while total_bytes_sent < buffer_length:
-      bytes_sent = self.sock.send(chunk[total_bytes_sent:])
-      if bytes_sent == 0:
-        raise ConnectionClosedException("Socket sent 0 bytes.")
-      total_bytes_sent += bytes_sent
+    self.writer.write(chunk)
 
   def write_buffer_length(self, n):
-    bytes_sent = self.sock.sendall(BIG_ENDIAN_INT_STRUCT.pack(n))
-    if bytes_sent == 0:
-      raise ConnectionClosedException("socket sent 0 bytes")
+    self.writer.write(BIG_ENDIAN_INT_STRUCT.pack(n))
 
-  def read_buffer_length(self):
-    read = self.sock.recv(BUFFER_HEADER_LENGTH)
-    if read == '':
-      raise ConnectionClosedException("Socket read 0 bytes.")
-    return BIG_ENDIAN_INT_STRUCT.unpack(read)[0]
+#
+# Transceiver Implementations
+#
+
+class HTTPTransceiver(object):
+  """
+  A simple HTTP-based transceiver implementation.
+  Useful for clients but not for servers
+  """
+  def __init__(self, conn):
+    self._conn = conn
+
+  # read-only properties
+  conn = property(lambda self: self._conn)
+  sock = property(lambda self: self.conn.sock)
+  remote_name = property(lambda self: self.sock.getsockname())
+
+  def transceive(self, request):
+    self.write_framed_message(request)
+    return self.read_framed_message()
+
+  def read_framed_message(self):
+    response_reader = FramedReader(self.conn.getresponse())
+    return response_reader.read_framed_message()
+
+  def write_framed_message(self, message):
+    req_method = 'POST'
+    req_resource = '/'
+    req_headers = {'Content-Type': 'avro/binary'}
+
+    req_body_buffer = FramedWriter(cStringIO.StringIO())
+    req_body_buffer.write_framed_message(message)
+    req_body = req_body_buffer.writer.getvalue()
+
+    self.conn.request(req_method, req_resource, req_body, req_headers)
 
   def close(self):
-    self.sock.close()
+    self.conn.close()
 
 #
 # Server Implementations (none yet)

Added: hadoop/avro/trunk/lang/py/test/sample_http_client.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/test/sample_http_client.py?rev=906394&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/py/test/sample_http_client.py (added)
+++ hadoop/avro/trunk/lang/py/test/sample_http_client.py Thu Feb  4 08:15:02 2010
@@ -0,0 +1,96 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import httplib
+import sys
+
+from avro import ipc
+from avro import protocol
+from avro import schema
+
+MAIL_PROTOCOL_JSON = """\
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+     {"name": "Message", "type": "record",
+      "fields": [
+          {"name": "to",   "type": "string"},
+          {"name": "from", "type": "string"},
+          {"name": "body", "type": "string"}
+      ]
+     }
+ ],
+
+ "messages": {
+     "send": {
+         "request": [{"name": "message", "type": "Message"}],
+         "response": "string"
+     },
+     "replay": {
+         "request": [],
+         "response": "string"
+     }
+ }
+}
+"""
+MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON)
+SERVER_HOST = 'localhost'
+SERVER_PORT = 9090
+
+class UsageError(Exception):
+  def __init__(self, value):
+    self.value = value
+  def __str__(self):
+    return repr(self.value)
+
+def make_requestor(server_host, server_port, protocol):
+  conn = httplib.HTTPConnection(SERVER_HOST, SERVER_PORT)
+  conn.connect()
+  client = ipc.HTTPTransceiver(conn)
+  return ipc.Requestor(protocol, client)
+
+if __name__ == '__main__':
+  if len(sys.argv) not in [4, 5]:
+    raise UsageError("Usage: <to> <from> <body> [<count>]")
+
+  # client code - attach to the server and send a message
+  # fill in the Message record
+  message = dict()
+  message['to'] = sys.argv[1]
+  message['from'] = sys.argv[2]
+  message['body'] = sys.argv[3]
+
+  try:
+    num_messages = int(sys.argv[4])
+  except:
+    num_messages = 1
+
+  # build the parameters for the request
+  params = {}
+  params['message'] = message
+   
+  # send the requests and print the result
+  for msg_count in range(num_messages):
+    requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
+    result = requestor.request('send', params)
+    print("Result: " + result)
+
+  # try out a replay message
+  requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
+  result = requestor.request('replay', dict())
+  print("Replay Result: " + result)

Added: hadoop/avro/trunk/lang/py/test/sample_http_server.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/test/sample_http_server.py?rev=906394&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/py/test/sample_http_server.py (added)
+++ hadoop/avro/trunk/lang/py/test/sample_http_server.py Thu Feb  4 08:15:02 2010
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+from avro import ipc
+from avro import protocol
+from avro import schema
+
+MAIL_PROTOCOL_JSON = """\
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+     {"name": "Message", "type": "record",
+      "fields": [
+          {"name": "to",   "type": "string"},
+          {"name": "from", "type": "string"},
+          {"name": "body", "type": "string"}
+      ]
+     }
+ ],
+
+ "messages": {
+     "send": {
+         "request": [{"name": "message", "type": "Message"}],
+         "response": "string"
+     },
+     "replay": {
+         "request": [],
+         "response": "string"
+     }
+ }
+}
+"""
+MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON)
+SERVER_ADDRESS = ('localhost', 9090)
+
+class MailResponder(ipc.Responder):
+  def __init__(self):
+    ipc.Responder.__init__(self, MAIL_PROTOCOL)
+
+  def invoke(self, message, request):
+    if message.name == 'send':
+      request_content = request['message']
+      response = "Sent message to %(to)s from %(from)s with body %(body)s" % \
+                 request_content
+      return response
+    elif message.name == 'replay':
+      return 'replay'
+
+class MailHandler(BaseHTTPRequestHandler):
+  def do_POST(self):
+    self.responder = MailResponder()
+    call_request_reader = ipc.FramedReader(self.rfile)
+    call_request = call_request_reader.read_framed_message()
+    resp_body = self.responder.respond(call_request)
+    self.send_response(200)
+    self.send_header('Content-Type', 'avro/binary')
+    self.end_headers()
+    resp_writer = ipc.FramedWriter(self.wfile)
+    resp_writer.write_framed_message(resp_body)
+
+if __name__ == '__main__':
+  mail_server = HTTPServer(SERVER_ADDRESS, MailHandler)
+  mail_server.allow_reuse_address = True
+  mail_server.serve_forever()

Modified: hadoop/avro/trunk/lang/py/test/sample_ipc_client.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/test/sample_ipc_client.py?rev=906394&r1=906393&r2=906394&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/py/test/sample_ipc_client.py (original)
+++ hadoop/avro/trunk/lang/py/test/sample_ipc_client.py Thu Feb  4 08:15:02 2010
@@ -1,95 +0,0 @@
-#!/usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import socket
-import sys
-
-from avro import ipc
-from avro import protocol
-from avro import schema
-
-MAIL_PROTOCOL_JSON = """\
-{"namespace": "example.proto",
- "protocol": "Mail",
-
- "types": [
-     {"name": "Message", "type": "record",
-      "fields": [
-          {"name": "to",   "type": "string"},
-          {"name": "from", "type": "string"},
-          {"name": "body", "type": "string"}
-      ]
-     }
- ],
-
- "messages": {
-     "send": {
-         "request": [{"name": "message", "type": "Message"}],
-         "response": "string"
-     },
-     "replay": {
-         "request": [],
-         "response": "string"
-     }
- }
-}
-"""
-MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON)
-SERVER_ADDRESS = ('localhost', 9090)
-
-class UsageError(Exception):
-  def __init__(self, value):
-    self.value = value
-  def __str__(self):
-    return repr(self.value)
-
-def make_requestor(server_address, protocol):
-  sock = socket.socket()
-  sock.connect(server_address)
-  client = ipc.SocketTransport(sock)
-  return ipc.Requestor(protocol, client)
-
-if __name__ == '__main__':
-  if len(sys.argv) not in [4, 5]:
-    raise UsageError("Usage: <to> <from> <body> [<count>]")
-
-  # client code - attach to the server and send a message
-  # fill in the Message record
-  message = dict()
-  message['to'] = sys.argv[1]
-  message['from'] = sys.argv[2]
-  message['body'] = sys.argv[3]
-
-  try:
-    num_messages = int(sys.argv[4])
-  except:
-    num_messages = 1
-
-  # build the parameters for the request
-  params = {}
-  params['message'] = message
-   
-  # send the requests and print the result
-  for msg_count in range(num_messages):
-    requestor = make_requestor(SERVER_ADDRESS, MAIL_PROTOCOL)
-    result = requestor.request('send', params)
-    print("Result: " + result)
-
-  # try out a replay message
-  requestor = make_requestor(SERVER_ADDRESS, MAIL_PROTOCOL)
-  result = requestor.request('replay', dict())
-  print("Replay Result: " + result)

Modified: hadoop/avro/trunk/lang/py/test/sample_ipc_server.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/test/sample_ipc_server.py?rev=906394&r1=906393&r2=906394&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/py/test/sample_ipc_server.py (original)
+++ hadoop/avro/trunk/lang/py/test/sample_ipc_server.py Thu Feb  4 08:15:02 2010
@@ -1,74 +0,0 @@
-#!/usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from SocketServer import BaseRequestHandler, TCPServer
-from avro import ipc
-from avro import protocol
-from avro import schema
-
-MAIL_PROTOCOL_JSON = """\
-{"namespace": "example.proto",
- "protocol": "Mail",
-
- "types": [
-     {"name": "Message", "type": "record",
-      "fields": [
-          {"name": "to",   "type": "string"},
-          {"name": "from", "type": "string"},
-          {"name": "body", "type": "string"}
-      ]
-     }
- ],
-
- "messages": {
-     "send": {
-         "request": [{"name": "message", "type": "Message"}],
-         "response": "string"
-     },
-     "replay": {
-         "request": [],
-         "response": "string"
-     }
- }
-}
-"""
-MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON)
-SERVER_ADDRESS = ('localhost', 9090)
-
-class MailResponder(ipc.Responder):
-  def __init__(self):
-    ipc.Responder.__init__(self, MAIL_PROTOCOL)
-
-  def invoke(self, message, request):
-    if message.name == 'send':
-      request_content = request['message']
-      response = "Sent message to %(to)s from %(from)s with body %(body)s" % \
-                 request_content
-      return response
-    elif message.name == 'replay':
-      return 'replay'
-
-class MailHandler(BaseRequestHandler):
-  def handle(self):
-    self.responder = MailResponder()
-    self.transport = ipc.SocketTransport(self.request)
-    self.transport.write_framed_message(self.responder.respond(self.transport))
-
-if __name__ == '__main__':
-  mail_server = TCPServer(SERVER_ADDRESS, MailHandler)
-  mail_server.allow_reuse_address = True
-  mail_server.serve_forever()



Mime
View raw message