Return-Path: Delivered-To: apmail-hadoop-avro-commits-archive@minotaur.apache.org Received: (qmail 4733 invoked from network); 6 Apr 2010 05:52:33 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 6 Apr 2010 05:52:33 -0000 Received: (qmail 21443 invoked by uid 500); 6 Apr 2010 05:52:33 -0000 Delivered-To: apmail-hadoop-avro-commits-archive@hadoop.apache.org Received: (qmail 21413 invoked by uid 500); 6 Apr 2010 05:52:32 -0000 Mailing-List: contact avro-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: avro-dev@hadoop.apache.org Delivered-To: mailing list avro-commits@hadoop.apache.org Received: (qmail 21404 invoked by uid 99); 6 Apr 2010 05:52:31 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Apr 2010 05:52:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Apr 2010 05:52:29 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A72062388900; Tue, 6 Apr 2010 05:52:09 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r931026 - in /hadoop/avro/trunk: CHANGES.txt lang/ruby/lib/avro/ipc.rb lang/ruby/test/sample_ipc_http_client.rb lang/ruby/test/sample_ipc_http_server.rb Date: Tue, 06 Apr 2010 05:52:09 -0000 To: avro-commits@hadoop.apache.org From: jmhodges@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100406055209.A72062388900@eris.apache.org> Author: jmhodges Date: Tue Apr 6 05:52:09 2010 New Revision: 931026 URL: http://svn.apache.org/viewvc?rev=931026&view=rev Log: AVRO-450. HTTP IPC for ruby. Added: hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_client.rb hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_server.rb Modified: hadoop/avro/trunk/CHANGES.txt hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb Modified: hadoop/avro/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=931026&r1=931025&r2=931026&view=diff ============================================================================== --- hadoop/avro/trunk/CHANGES.txt (original) +++ hadoop/avro/trunk/CHANGES.txt Tue Apr 6 05:52:09 2010 @@ -18,6 +18,8 @@ Avro 1.4.0 (unreleased) AVRO-497. Minor changes to C++ autotools, makefiles, and code generator. (sbanacho) + AVRO-450. HTTP IPC for ruby. (jmhodges) + BUG FIXES AVRO-461. Skipping primitives in the ruby side (jmhodges) Modified: hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb?rev=931026&r1=931025&r2=931026&view=diff ============================================================================== --- hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb (original) +++ hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb Tue Apr 6 05:52:09 2010 @@ -168,13 +168,13 @@ module Avro::IPC true when 'CLIENT' raise AvroError.new('Handshake failure. match == CLIENT') if send_protocol - self.remote_protocol = handshake_response['serverProtocol'] + self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol']) self.remote_hash = handshake_response['serverHash'] self.send_protocol = false false when 'NONE' raise AvroError.new('Handshake failure. match == NONE') if send_protocol - self.remote_protocol = handshake_response['serverProtocol'] + self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol']) self.remote_hash = handshake_response['serverHash'] self.send_protocol = true false @@ -236,11 +236,10 @@ module Avro::IPC protocol_cache[local_hash] = local_protocol end - def respond(transport) - # Called by a server to deserialize a request, compute and serialize - # a response or error. Compare to 'handle()' in Thrift. - - call_request = transport.read_framed_message + # Called by a server to deserialize a request, compute and serialize + # a response or error. Compare to 'handle()' in Thrift. + def respond(call_request) + buffer_reader = StringIO.new(call_request) buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request)) buffer_writer = StringIO.new('', 'w+') buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer) @@ -248,7 +247,7 @@ module Avro::IPC response_metadata = {} begin - remote_protocol = process_handshake(transport, buffer_decoder, buffer_encoder) + remote_protocol = process_handshake(buffer_decoder, buffer_encoder) # handshake failure unless remote_protocol return buffer_writer.string @@ -300,7 +299,7 @@ module Avro::IPC buffer_writer.string end - def process_handshake(transport, decoder, encoder) + def process_handshake(decoder, encoder) handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder) handshake_response = {} @@ -308,6 +307,7 @@ module Avro::IPC client_hash = handshake_request['clientHash'] client_protocol = handshake_request['clientProtocol'] remote_protocol = protocol_cache[client_hash] + if !remote_protocol && client_protocol remote_protocol = protocol.parse(client_protocol) protocol_cache[client_hash] = remote_protocol @@ -440,4 +440,95 @@ module Avro::IPC sock.close end end + + class ConnectionClosedError < StandardError; end + + class FramedWriter + attr_reader :writer + def initialize(writer) + @writer = writer + end + + def write_framed_message(message) + message_size = message.size + total_bytes_sent = 0 + while message_size - total_bytes_sent > 0 + if message_size - total_bytes_sent > BUFFER_SIZE + buffer_size = BUFFER_SIZE + else + buffer_size = message_size - total_bytes_sent + end + write_buffer(message[total_bytes_sent, buffer_size]) + total_bytes_sent += buffer_size + end + write_buffer_size(0) + end + + def to_s; writer.string; end + + private + def write_buffer(chunk) + buffer_size = chunk.size + write_buffer_size(buffer_size) + writer << chunk + end + + def write_buffer_size(n) + writer.write([n].pack('N')) + end + end + + class FramedReader + attr_reader :reader + + def initialize(reader) + @reader = reader + end + + def read_framed_message + message = [] + loop do + buffer = "" + buffer_size = read_buffer_size + + return message.join if buffer_size == 0 + + while buffer.size < buffer_size + chunk = reader.read(buffer_size - buffer.size) + chunk_error?(chunk) + buffer << chunk + end + message << buffer + end + end + + private + def read_buffer_size + header = reader.read(BUFFER_HEADER_LENGTH) + chunk_error?(header) + header.unpack('N')[0] + end + + def chunk_error?(chunk) + raise ConnectionClosedError.new("Reader read 0 bytes") if chunk == '' + end + end + + # Only works for clients. Sigh. + class HTTPTransceiver + attr_reader :remote_name, :host, :port + def initialize(host, port) + @host, @port = host, port + @remote_name = "#{host}:#{port}" + end + + def transceive(message) + writer = FramedWriter.new(StringIO.new) + writer.write_framed_message(message) + resp = Net::HTTP.start(host, port) do |http| + http.post('/', writer.to_s, {'Content-Type' => 'avro/binary'}) + end + FramedReader.new(StringIO.new(resp.body)).read_framed_message + end + end end Added: hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_client.rb URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_client.rb?rev=931026&view=auto ============================================================================== --- hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_client.rb (added) +++ hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_client.rb Tue Apr 6 05:52:09 2010 @@ -0,0 +1,84 @@ +#!/usr/bin/env ruby +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'socket' +require 'avro' + +MAIL_PROTOCOL_JSON = <<-JSON +{"namespace": "example.proto", + "protocol": "Mail", + + "types": [ + {"name": "Message", "type": "record", + "fields": [ + {"name": "to", "type": "string"}, + {"name": "from", "type": "string"}, + {"name": "body", "type": "string"} + ] + } + ], + + "messages": { + "send": { + "request": [{"name": "message", "type": "Message"}], + "response": "string" + }, + "replay": { + "request": [], + "response": "string" + } + } +} +JSON + +MAIL_PROTOCOL = Avro::Protocol.parse(MAIL_PROTOCOL_JSON) + +def make_requestor(server_address, port, protocol) + transport = Avro::IPC::HTTPTransceiver.new(server_address, port) + Avro::IPC::Requestor.new(protocol, transport) +end + +if $0 == __FILE__ + if ![3, 4].include?(ARGV.length) + raise "Usage: []" + end + + # client code - attach to the server and send a message + # fill in the Message record + message = { + 'to' => ARGV[0], + 'from' => ARGV[1], + 'body' => ARGV[2] + } + + num_messages = (ARGV[3] || 1).to_i + + # build the parameters for the request + params = {'message' => message} + # send the requests and print the result + + num_messages.times do + requestor = make_requestor('localhost', 9090, MAIL_PROTOCOL) + result = requestor.request('send', params) + puts("Result: " + result) + end + + # try out a replay message + requestor = make_requestor('localhost', 9090, MAIL_PROTOCOL) + result = requestor.request('replay', {}) + puts("Replay Result: " + result) +end Added: hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_server.rb URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_server.rb?rev=931026&view=auto ============================================================================== --- hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_server.rb (added) +++ hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_server.rb Tue Apr 6 05:52:09 2010 @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +require 'avro' +require 'webrick' + +MAIL_PROTOCOL_JSON = <<-JSON +{"namespace": "example.proto", + "protocol": "Mail", + + "types": [ + {"name": "Message", "type": "record", + "fields": [ + {"name": "to", "type": "string"}, + {"name": "from", "type": "string"}, + {"name": "body", "type": "string"} + ] + } + ], + + "messages": { + "send": { + "request": [{"name": "message", "type": "Message"}], + "response": "string" + }, + "replay": { + "request": [], + "response": "string" + } + } +} +JSON + +MAIL_PROTOCOL = Avro::Protocol.parse(MAIL_PROTOCOL_JSON) + +class MailResponder < Avro::IPC::Responder + def initialize + super(MAIL_PROTOCOL) + end + + def call(message, request) + if message.name == 'send' + request_content = request['message'] + "Sent message to #{request_content['to']} from #{request_content['from']} with body #{request_content['body']}" + elsif message.name == 'replay' + 'replay' + end + end +end + +class MailHandler < WEBrick::HTTPServlet::AbstractServlet + def do_POST(req, resp) + responder = MailResponder.new + call_request = Avro::IPC::FramedReader.new(StringIO.new(req.body)).read_framed_message + unframed_resp = responder.respond(call_request) + writer = Avro::IPC::FramedWriter.new(StringIO.new) + writer.write_framed_message(unframed_resp) + resp.body = writer.to_s + raise WEBrick::HTTPStatus::OK + end +end + +if $0 == __FILE__ + server = WEBrick::HTTPServer.new(:Host => 'localhost', :Port => 9090) + server.mount '/', MailHandler + trap("INT") { server.shutdown } + server.start +end