Return-Path: Delivered-To: apmail-avro-commits-archive@www.apache.org Received: (qmail 38578 invoked from network); 4 Jun 2010 05:32:07 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 4 Jun 2010 05:32:07 -0000 Received: (qmail 30864 invoked by uid 500); 4 Jun 2010 05:32:07 -0000 Delivered-To: apmail-avro-commits-archive@avro.apache.org Received: (qmail 30841 invoked by uid 500); 4 Jun 2010 05:32:07 -0000 Mailing-List: contact commits-help@avro.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@avro.apache.org Delivered-To: mailing list commits@avro.apache.org Received: (qmail 30833 invoked by uid 99); 4 Jun 2010 05:32:07 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Jun 2010 05:32:07 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Jun 2010 05:32:06 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DCA7E23889CB; Fri, 4 Jun 2010 05:31:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r951273 - in /avro/branches/branch-1.3: ./ CHANGES.txt lang/ruby/lib/avro/ipc.rb lang/ruby/test/sample_ipc_http_client.rb lang/ruby/test/sample_ipc_http_server.rb Date: Fri, 04 Jun 2010 05:31:46 -0000 To: commits@avro.apache.org From: jmhodges@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100604053146.DCA7E23889CB@eris.apache.org> Author: jmhodges Date: Fri Jun 4 05:31:46 2010 New Revision: 951273 URL: http://svn.apache.org/viewvc?rev=951273&view=rev Log: AVRO-450. HTTP IPC for ruby. merge from trunk Added: avro/branches/branch-1.3/lang/ruby/test/sample_ipc_http_client.rb - copied unchanged from r931026, hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_client.rb avro/branches/branch-1.3/lang/ruby/test/sample_ipc_http_server.rb - copied unchanged from r931026, hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_server.rb Modified: avro/branches/branch-1.3/ (props changed) avro/branches/branch-1.3/CHANGES.txt avro/branches/branch-1.3/lang/ruby/lib/avro/ipc.rb Propchange: avro/branches/branch-1.3/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Jun 4 05:31:46 2010 @@ -1,2 +1,2 @@ /avro/trunk:944035,944049 -/hadoop/avro/trunk:930458-930459,930461-930462,930599,935526,938347 +/hadoop/avro/trunk:930458-930459,930461-930462,930599,931026,935526,938347 Modified: avro/branches/branch-1.3/CHANGES.txt URL: http://svn.apache.org/viewvc/avro/branches/branch-1.3/CHANGES.txt?rev=951273&r1=951272&r2=951273&view=diff ============================================================================== --- avro/branches/branch-1.3/CHANGES.txt (original) +++ avro/branches/branch-1.3/CHANGES.txt Fri Jun 4 05:31:46 2010 @@ -17,6 +17,8 @@ Avro 1.3.3 (Unreleased) AVRO-491. Doing doubles and floats better in the ruby impl. (jmhodges) + AVRO-450. HTTP IPC for ruby. (jmhodges) + BUG FIXES AVRO-461. Skipping primitives in the ruby side (jmhodges) Modified: avro/branches/branch-1.3/lang/ruby/lib/avro/ipc.rb URL: http://svn.apache.org/viewvc/avro/branches/branch-1.3/lang/ruby/lib/avro/ipc.rb?rev=951273&r1=951272&r2=951273&view=diff ============================================================================== --- avro/branches/branch-1.3/lang/ruby/lib/avro/ipc.rb (original) +++ avro/branches/branch-1.3/lang/ruby/lib/avro/ipc.rb Fri Jun 4 05:31:46 2010 @@ -168,13 +168,13 @@ module Avro::IPC true when 'CLIENT' raise AvroError.new('Handshake failure. match == CLIENT') if send_protocol - self.remote_protocol = handshake_response['serverProtocol'] + self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol']) self.remote_hash = handshake_response['serverHash'] self.send_protocol = false false when 'NONE' raise AvroError.new('Handshake failure. match == NONE') if send_protocol - self.remote_protocol = handshake_response['serverProtocol'] + self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol']) self.remote_hash = handshake_response['serverHash'] self.send_protocol = true false @@ -236,11 +236,10 @@ module Avro::IPC protocol_cache[local_hash] = local_protocol end - def respond(transport) - # Called by a server to deserialize a request, compute and serialize - # a response or error. Compare to 'handle()' in Thrift. - - call_request = transport.read_framed_message + # Called by a server to deserialize a request, compute and serialize + # a response or error. Compare to 'handle()' in Thrift. + def respond(call_request) + buffer_reader = StringIO.new(call_request) buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request)) buffer_writer = StringIO.new('', 'w+') buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer) @@ -248,7 +247,7 @@ module Avro::IPC response_metadata = {} begin - remote_protocol = process_handshake(transport, buffer_decoder, buffer_encoder) + remote_protocol = process_handshake(buffer_decoder, buffer_encoder) # handshake failure unless remote_protocol return buffer_writer.string @@ -300,7 +299,7 @@ module Avro::IPC buffer_writer.string end - def process_handshake(transport, decoder, encoder) + def process_handshake(decoder, encoder) handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder) handshake_response = {} @@ -308,6 +307,7 @@ module Avro::IPC client_hash = handshake_request['clientHash'] client_protocol = handshake_request['clientProtocol'] remote_protocol = protocol_cache[client_hash] + if !remote_protocol && client_protocol remote_protocol = protocol.parse(client_protocol) protocol_cache[client_hash] = remote_protocol @@ -440,4 +440,95 @@ module Avro::IPC sock.close end end + + class ConnectionClosedError < StandardError; end + + class FramedWriter + attr_reader :writer + def initialize(writer) + @writer = writer + end + + def write_framed_message(message) + message_size = message.size + total_bytes_sent = 0 + while message_size - total_bytes_sent > 0 + if message_size - total_bytes_sent > BUFFER_SIZE + buffer_size = BUFFER_SIZE + else + buffer_size = message_size - total_bytes_sent + end + write_buffer(message[total_bytes_sent, buffer_size]) + total_bytes_sent += buffer_size + end + write_buffer_size(0) + end + + def to_s; writer.string; end + + private + def write_buffer(chunk) + buffer_size = chunk.size + write_buffer_size(buffer_size) + writer << chunk + end + + def write_buffer_size(n) + writer.write([n].pack('N')) + end + end + + class FramedReader + attr_reader :reader + + def initialize(reader) + @reader = reader + end + + def read_framed_message + message = [] + loop do + buffer = "" + buffer_size = read_buffer_size + + return message.join if buffer_size == 0 + + while buffer.size < buffer_size + chunk = reader.read(buffer_size - buffer.size) + chunk_error?(chunk) + buffer << chunk + end + message << buffer + end + end + + private + def read_buffer_size + header = reader.read(BUFFER_HEADER_LENGTH) + chunk_error?(header) + header.unpack('N')[0] + end + + def chunk_error?(chunk) + raise ConnectionClosedError.new("Reader read 0 bytes") if chunk == '' + end + end + + # Only works for clients. Sigh. + class HTTPTransceiver + attr_reader :remote_name, :host, :port + def initialize(host, port) + @host, @port = host, port + @remote_name = "#{host}:#{port}" + end + + def transceive(message) + writer = FramedWriter.new(StringIO.new) + writer.write_framed_message(message) + resp = Net::HTTP.start(host, port) do |http| + http.post('/', writer.to_s, {'Content-Type' => 'avro/binary'}) + end + FramedReader.new(StringIO.new(resp.body)).read_framed_message + end + end end