avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmhod...@apache.org
Subject svn commit: r951273 - in /avro/branches/branch-1.3: ./ CHANGES.txt lang/ruby/lib/avro/ipc.rb lang/ruby/test/sample_ipc_http_client.rb lang/ruby/test/sample_ipc_http_server.rb
Date Fri, 04 Jun 2010 05:31:46 GMT
Author: jmhodges
Date: Fri Jun  4 05:31:46 2010
New Revision: 951273

URL: http://svn.apache.org/viewvc?rev=951273&view=rev
Log:
AVRO-450. HTTP IPC for ruby. merge from trunk

Added:
    avro/branches/branch-1.3/lang/ruby/test/sample_ipc_http_client.rb
      - copied unchanged from r931026, hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_client.rb
    avro/branches/branch-1.3/lang/ruby/test/sample_ipc_http_server.rb
      - copied unchanged from r931026, hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_server.rb
Modified:
    avro/branches/branch-1.3/   (props changed)
    avro/branches/branch-1.3/CHANGES.txt
    avro/branches/branch-1.3/lang/ruby/lib/avro/ipc.rb

Propchange: avro/branches/branch-1.3/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun  4 05:31:46 2010
@@ -1,2 +1,2 @@
 /avro/trunk:944035,944049
-/hadoop/avro/trunk:930458-930459,930461-930462,930599,935526,938347
+/hadoop/avro/trunk:930458-930459,930461-930462,930599,931026,935526,938347

Modified: avro/branches/branch-1.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.3/CHANGES.txt?rev=951273&r1=951272&r2=951273&view=diff
==============================================================================
--- avro/branches/branch-1.3/CHANGES.txt (original)
+++ avro/branches/branch-1.3/CHANGES.txt Fri Jun  4 05:31:46 2010
@@ -17,6 +17,8 @@ Avro 1.3.3 (Unreleased)
 
     AVRO-491. Doing doubles and floats better in the ruby impl. (jmhodges)
 
+    AVRO-450. HTTP IPC for ruby. (jmhodges)
+
   BUG FIXES
     AVRO-461. Skipping primitives in the ruby side (jmhodges)
 

Modified: avro/branches/branch-1.3/lang/ruby/lib/avro/ipc.rb
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.3/lang/ruby/lib/avro/ipc.rb?rev=951273&r1=951272&r2=951273&view=diff
==============================================================================
--- avro/branches/branch-1.3/lang/ruby/lib/avro/ipc.rb (original)
+++ avro/branches/branch-1.3/lang/ruby/lib/avro/ipc.rb Fri Jun  4 05:31:46 2010
@@ -168,13 +168,13 @@ module Avro::IPC
         true
       when 'CLIENT'
         raise AvroError.new('Handshake failure. match == CLIENT') if send_protocol
-        self.remote_protocol = handshake_response['serverProtocol']
+        self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol'])
         self.remote_hash = handshake_response['serverHash']
         self.send_protocol = false
         false
       when 'NONE'
         raise AvroError.new('Handshake failure. match == NONE') if send_protocol
-        self.remote_protocol = handshake_response['serverProtocol']
+        self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol'])
         self.remote_hash = handshake_response['serverHash']
         self.send_protocol = true
         false
@@ -236,11 +236,10 @@ module Avro::IPC
       protocol_cache[local_hash] = local_protocol
     end
 
-    def respond(transport)
-      # Called by a server to deserialize a request, compute and serialize
-      # a response or error. Compare to 'handle()' in Thrift.
-
-      call_request = transport.read_framed_message
+    # Called by a server to deserialize a request, compute and serialize
+    # a response or error. Compare to 'handle()' in Thrift.
+    def respond(call_request)
+      buffer_reader = StringIO.new(call_request)
       buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request))
       buffer_writer = StringIO.new('', 'w+')
       buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer)
@@ -248,7 +247,7 @@ module Avro::IPC
       response_metadata = {}
 
       begin
-        remote_protocol = process_handshake(transport, buffer_decoder, buffer_encoder)
+        remote_protocol = process_handshake(buffer_decoder, buffer_encoder)
         # handshake failure
         unless remote_protocol
           return buffer_writer.string
@@ -300,7 +299,7 @@ module Avro::IPC
       buffer_writer.string
     end
 
-    def process_handshake(transport, decoder, encoder)
+    def process_handshake(decoder, encoder)
       handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder)
       handshake_response = {}
 
@@ -308,6 +307,7 @@ module Avro::IPC
       client_hash = handshake_request['clientHash']
       client_protocol = handshake_request['clientProtocol']
       remote_protocol = protocol_cache[client_hash]
+
       if !remote_protocol && client_protocol
         remote_protocol = protocol.parse(client_protocol)
         protocol_cache[client_hash] = remote_protocol
@@ -440,4 +440,95 @@ module Avro::IPC
       sock.close
     end
   end
+
+  class ConnectionClosedError < StandardError; end
+
+  class FramedWriter
+    attr_reader :writer
+    def initialize(writer)
+      @writer = writer
+    end
+
+    def write_framed_message(message)
+      message_size = message.size
+      total_bytes_sent = 0
+      while message_size - total_bytes_sent > 0
+        if message_size - total_bytes_sent > BUFFER_SIZE
+          buffer_size = BUFFER_SIZE
+        else
+          buffer_size = message_size - total_bytes_sent
+        end
+        write_buffer(message[total_bytes_sent, buffer_size])
+        total_bytes_sent += buffer_size
+      end
+      write_buffer_size(0)
+    end
+
+    def to_s; writer.string; end
+
+    private
+    def write_buffer(chunk)
+      buffer_size = chunk.size
+      write_buffer_size(buffer_size)
+      writer << chunk
+    end
+
+    def write_buffer_size(n)
+      writer.write([n].pack('N'))
+    end
+  end
+
+  class FramedReader
+    attr_reader :reader
+
+    def initialize(reader)
+      @reader = reader
+    end
+
+    def read_framed_message
+      message = []
+      loop do
+        buffer = ""
+        buffer_size = read_buffer_size
+
+        return message.join if buffer_size == 0
+
+        while buffer.size < buffer_size
+          chunk = reader.read(buffer_size - buffer.size)
+          chunk_error?(chunk)
+          buffer << chunk
+        end
+        message << buffer
+      end
+    end
+
+    private
+    def read_buffer_size
+      header = reader.read(BUFFER_HEADER_LENGTH)
+      chunk_error?(header)
+      header.unpack('N')[0]
+    end
+
+    def chunk_error?(chunk)
+      raise ConnectionClosedError.new("Reader read 0 bytes") if chunk == ''
+    end
+  end
+
+  # Only works for clients. Sigh.
+  class HTTPTransceiver
+    attr_reader :remote_name, :host, :port
+    def initialize(host, port)
+      @host, @port = host, port
+      @remote_name = "#{host}:#{port}"
+    end
+
+    def transceive(message)
+      writer = FramedWriter.new(StringIO.new)
+      writer.write_framed_message(message)
+      resp = Net::HTTP.start(host, port) do |http|
+        http.post('/', writer.to_s, {'Content-Type' => 'avro/binary'})
+      end
+      FramedReader.new(StringIO.new(resp.body)).read_framed_message
+    end
+  end
 end



Mime
View raw message