avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r901024 [1/2] - in /hadoop/avro/trunk: ./ lang/ruby/ lang/ruby/lib/ lang/ruby/lib/avro/ lang/ruby/test/ share/
Date Wed, 20 Jan 2010 01:05:07 GMT
Author: cutting
Date: Wed Jan 20 01:05:06 2010
New Revision: 901024

URL: http://svn.apache.org/viewvc?rev=901024&view=rev
Log:
AVRO-306.  Add ruby implementation.  Contributed by Jeff Hodges.

Added:
    hadoop/avro/trunk/lang/ruby/   (with props)
    hadoop/avro/trunk/lang/ruby/.gitignore
    hadoop/avro/trunk/lang/ruby/CHANGELOG
    hadoop/avro/trunk/lang/ruby/Manifest
    hadoop/avro/trunk/lang/ruby/Rakefile
    hadoop/avro/trunk/lang/ruby/lib/
    hadoop/avro/trunk/lang/ruby/lib/avro/
    hadoop/avro/trunk/lang/ruby/lib/avro.rb
    hadoop/avro/trunk/lang/ruby/lib/avro/collect_hash.rb
    hadoop/avro/trunk/lang/ruby/lib/avro/data_file.rb
    hadoop/avro/trunk/lang/ruby/lib/avro/io.rb
    hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb
    hadoop/avro/trunk/lang/ruby/lib/avro/protocol.rb
    hadoop/avro/trunk/lang/ruby/lib/avro/schema.rb
    hadoop/avro/trunk/lang/ruby/test/
    hadoop/avro/trunk/lang/ruby/test/sample_ipc_client.rb
    hadoop/avro/trunk/lang/ruby/test/sample_ipc_server.rb
    hadoop/avro/trunk/lang/ruby/test/test_help.rb
    hadoop/avro/trunk/lang/ruby/test/test_io.rb
    hadoop/avro/trunk/lang/ruby/test/test_protocol.rb
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/README.txt
    hadoop/avro/trunk/build.sh
    hadoop/avro/trunk/share/rat-excludes.txt

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=901024&r1=901023&r2=901024&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Wed Jan 20 01:05:06 2010
@@ -68,6 +68,8 @@
   
     AVRO-346. Add function to validate a datum against a schema. (massie)
 
+    AVRO-306. Add Ruby implementation. (Jeff Hodges via cutting)
+
   IMPROVEMENTS
 
     AVRO-157. Changes from code review comments for C++. (sbanacho)

Modified: hadoop/avro/trunk/README.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/README.txt?rev=901024&r1=901023&r2=901024&view=diff
==============================================================================
--- hadoop/avro/trunk/README.txt (original)
+++ hadoop/avro/trunk/README.txt Wed Jan 20 01:05:06 2010
@@ -12,6 +12,7 @@
  - Python: 2.5 or greater
  - C: gcc, autoconf, automake, libtool, asciidoc 
  - C++: g++, flex, bison, libboost-dev
+ - Ruby: ruby, gem, rake, echoe, jajl-ruby
  - Apache Ant 1.7
  - Apache Forrest 0.8 (for documentation, requires Java 1.5)
 

Modified: hadoop/avro/trunk/build.sh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/build.sh?rev=901024&r1=901023&r2=901024&view=diff
==============================================================================
--- hadoop/avro/trunk/build.sh (original)
+++ hadoop/avro/trunk/build.sh Wed Jan 20 01:05:06 2010
@@ -44,6 +44,7 @@
 	(cd lang/py; ant test)
 	(cd lang/c; ./build.sh test)
 	# (cd lang/c++; make test)
+	# (cd lang/ruby; rake test)
 
 	# create interop test data
 	(cd lang/java; ant interop-data-generate)

Propchange: hadoop/avro/trunk/lang/ruby/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Jan 20 01:05:06 2010
@@ -0,0 +1 @@
+tmp

Added: hadoop/avro/trunk/lang/ruby/.gitignore
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/.gitignore?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/.gitignore (added)
+++ hadoop/avro/trunk/lang/ruby/.gitignore Wed Jan 20 01:05:06 2010
@@ -0,0 +1 @@
+tmp
\ No newline at end of file

Added: hadoop/avro/trunk/lang/ruby/CHANGELOG
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/CHANGELOG?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/CHANGELOG (added)
+++ hadoop/avro/trunk/lang/ruby/CHANGELOG Wed Jan 20 01:05:06 2010
@@ -0,0 +1 @@
+v0.0.1 stuff
\ No newline at end of file

Added: hadoop/avro/trunk/lang/ruby/Manifest
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/Manifest?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/Manifest (added)
+++ hadoop/avro/trunk/lang/ruby/Manifest Wed Jan 20 01:05:06 2010
@@ -0,0 +1,15 @@
+CHANGELOG
+Rakefile
+lib/rb/avro.rb
+lib/rb/avro/collect_hash.rb
+lib/rb/avro/data_file.rb
+lib/rb/avro/io.rb
+lib/rb/avro/ipc.rb
+lib/rb/avro/protocol.rb
+lib/rb/avro/schema.rb
+test/rb/sample_ipc_client.rb
+test/rb/sample_ipc_server.rb
+test/rb/test_help.rb
+test/rb/test_io.rb
+test/rb/test_protocol.rb
+Manifest

Added: hadoop/avro/trunk/lang/ruby/Rakefile
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/Rakefile?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/Rakefile (added)
+++ hadoop/avro/trunk/lang/ruby/Rakefile Wed Jan 20 01:05:06 2010
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'echoe'
+Echoe.new('avro') do |p|
+  p.author = "Jeff Hodges"
+  p.author = "Ryan King"
+  p.summary = "Apache Avro for Ruby"
+  p.url = "http://hadoop.apache.org/avro/"
+  p.runtime_dependencies = %w[rubygems yajl]
+end

Added: hadoop/avro/trunk/lang/ruby/lib/avro.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/lib/avro.rb (added)
+++ hadoop/avro/trunk/lang/ruby/lib/avro.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'yajl'
+require 'set'
+require 'md5'
+
+module Avro
+  VERSION = "FIXME"
+
+  class AvroError < StandardError; end
+
+  class AvroTypeError < Avro::AvroError
+    def initialize(schm=nil, datum=nil, msg=nil)
+      msg ||= "Not a #{schm.to_s}: #{datum}"
+      super(msg)
+    end
+  end
+end
+
+require 'avro/collect_hash'
+require 'avro/schema'
+require 'avro/io'
+require 'avro/data_file'
+require 'avro/protocol'
+require 'avro/ipc'

Added: hadoop/avro/trunk/lang/ruby/lib/avro/collect_hash.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro/collect_hash.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/lib/avro/collect_hash.rb (added)
+++ hadoop/avro/trunk/lang/ruby/lib/avro/collect_hash.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+module Enumerable
+  def collect_hash
+    inject(Hash.new) do |memo, i|
+      k, v = yield(i)
+      memo[k] = v if k
+      memo
+    end
+  end
+end

Added: hadoop/avro/trunk/lang/ruby/lib/avro/data_file.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro/data_file.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/lib/avro/data_file.rb (added)
+++ hadoop/avro/trunk/lang/ruby/lib/avro/data_file.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,243 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'openssl'
+
+module Avro
+  module DataFile
+    VERSION = 0
+    MAGIC = "Obj" + [VERSION].pack('c')
+    MAGIC_SIZE = MAGIC.size
+    SYNC_SIZE = 16
+    SYNC_INTERVAL = 1000 * SYNC_SIZE
+    META_SCHEMA = Schema.parse('{"type": "map", "values": "bytes"}')
+    VALID_CODECS = ['null']
+    VALID_ENCODINGS = ['binary'] # not used yet
+
+    class DataFileError < AvroError; end
+
+    class Writer
+      def self.generate_sync_marker
+        OpenSSL::Random.random_bytes(16)
+      end
+
+      attr_reader :writer, :encoder, :datum_writer, :buffer_writer, :buffer_encoder, :sync_marker, :meta
+      attr_accessor :block_count
+
+      def initialize(writer, datum_writer, writers_schema=nil)
+        # If writers_schema is not present, presume we're appending
+        @writer = writer
+        @encoder = IO::BinaryEncoder.new(@writer)
+        @datum_writer = datum_writer
+        @buffer_writer = StringIO.new('', 'w')
+        @buffer_encoder = IO::BinaryEncoder.new(@buffer_writer)
+        @block_count = 0
+
+        @meta = {}
+
+        if writers_schema
+          @sync_marker = Writer.generate_sync_marker
+          meta['codec'] = 'null'
+          meta['schema'] = writers_schema.to_s
+          datum_writer.writers_schema = writers_schema
+          write_header
+        else
+          # open writer for reading to collect metadata
+          dfr = DataFileReader.new(writer, Avro::IO::DatumReader.new)
+
+          # FIXME(jmhodges): collect arbitrary metadata
+          # collect metadata
+          @sync_marker = dfr.sync_marker
+          meta['codec'] = dfr.meta['codec']
+
+          # get schema used to write existing file
+          schema_from_file = dfr.meta['schema']
+          meta['schema'] = schema_from_file
+          datum_writer.writers_schema = Schema.parse(schema_from_file)
+
+          # seek to the end of the file and prepare for writing
+          writer.seek(0,2)
+        end
+      end
+
+      # Append a datum to the file
+      def <<(datum)
+        datum_writer.write(datum, buffer_encoder)
+        self.block_count += 1
+
+        # if the data to write is larger than the sync interval, write
+        # the block
+        if buffer_writer.tell >= SYNC_INTERVAL
+          write_block
+        end
+      end
+
+      # Return the current position as a value that may be passed to
+      # DataFileReader.seek(long). Forces the end of the current block,
+      # emitting a synchronization marker.
+      def sync
+        write_block
+        writer.tell
+      end
+
+      # Flush the current state of the file, including metadata
+      def flush
+        write_block
+        writer.flush
+      end
+
+      def close
+        flush
+        writer.close
+      end
+
+      private
+
+      def write_header
+        # write magic
+        writer.write(MAGIC)
+
+        # write metadata
+        datum_writer.write_data(META_SCHEMA, meta, encoder)
+
+        # write sync marker
+        writer.write(sync_marker)
+      end
+
+      # TODO(jmhodges): make a schema for blocks and use datum_writer
+      # TODO(jmhodges): do we really need the number of items in the block?
+      # TODO(jmhodges): use codec when writing the block contents
+      def write_block
+        if block_count > 0
+          # write number of items in block and block size in bytes
+          encoder.write_long(block_count)
+          to_write = buffer_writer.string
+          encoder.write_long(to_write.size)
+
+          # write block contents
+          if meta['codec'] == 'null'
+            writer.write(to_write)
+          else
+            msg = "#{meta['codec'].inspect} coded is not supported"
+            raise DataFileError, msg
+          end
+
+          # write sync marker
+          writer.write(sync_marker)
+
+          # reset buffer
+          buffer_writer.truncate(0)
+          self.block_count = 0
+        end
+      end
+    end
+
+    # Read files written by DataFileWriter
+    class Reader
+      include ::Enumerable
+
+      attr_reader :reader, :decoder, :datum_reader, :sync_marker, :meta, :file_length
+      attr_accessor :block_count
+
+      def initialize(reader, datum_reader)
+        @reader = reader
+        @decoder = IO::BinaryDecoder.new(reader)
+        @datum_reader = datum_reader
+
+        # read the header: magic, meta, sync
+        read_header
+
+        # ensure the codec is valid
+        codec_from_file = meta['codec']
+        if codec_from_file && ! VALID_CODECS.include?(codec_from_file)
+          raise DataFileError, "Unknown codec: #{codec_from_file}"
+        end
+
+        # get ready to read
+        @block_count = 0
+        datum_reader.writers_schema = Schema.parse meta['schema']
+      end
+
+      # Iterates through each datum in this file
+      # TODO(jmhodges): handle block of length zero
+      def each
+        loop do
+          if block_count == 0
+            case
+            when eof?; break
+            when skip_sync
+              break if eof?
+              read_block_header
+            else
+              read_block_header
+            end
+          end
+
+          datum = datum_reader.read(decoder)
+          self.block_count -= 1
+          yield(datum)
+        end
+      end
+
+      def eof?; reader.eof?; end
+
+      def close
+        reader.close
+      end
+
+      private
+      def read_header
+        # seek to the beginning of the file to get magic block
+        reader.seek(0, 0)
+
+        # check magic number
+        magic_in_file = reader.read(MAGIC_SIZE)
+        if magic_in_file.size < MAGIC_SIZE
+          msg = 'Not an Avro data file: shorter than the Avro magic block'
+          raise DataFileError, msg
+        elsif magic_in_file != MAGIC
+          msg = "Not an Avro data file: #{magic_in_file} doesn't match #{MAGIC}"
+          raise DataFileError, msg
+        end
+
+        # read metadata
+        @meta = datum_reader.read_data(META_SCHEMA,
+                                       META_SCHEMA,
+                                       decoder)
+        # read sync marker
+        @sync_marker = reader.read(SYNC_SIZE)
+      end
+
+      def read_block_header
+        self.block_count = decoder.read_long
+        decoder.read_long # not doing anything with length in bytes
+      end
+
+      # read the length of the sync marker; if it matches the sync
+      # marker, return true. Otherwise, seek back to where we started
+      # and return false
+      def skip_sync
+        proposed_sync_marker = reader.read(SYNC_SIZE)
+        if proposed_sync_marker != sync_marker
+          reader.seek(-SYNC_SIZE, 1)
+          false
+        else
+          true
+        end
+      end
+    end
+  end
+end

Added: hadoop/avro/trunk/lang/ruby/lib/avro/io.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro/io.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/lib/avro/io.rb (added)
+++ hadoop/avro/trunk/lang/ruby/lib/avro/io.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,571 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+module Avro
+  module IO
+    # Raised when datum is not an example of schema
+    class AvroTypeError < AvroError
+      def initialize(expected_schema, datum)
+        super("The datum #{datum.inspect} is not an example of schema #{expected_schema}")
+      end
+    end
+
+    # Raised when writer's and reader's schema do not match
+    class SchemaMatchException < AvroError
+      def initialize(writers_schema, readers_schema)
+        super("Writer's schema #{writers_schema} and Reader's schema " +
+              "#{readers_schema} do not match.")
+      end
+    end
+
+    # FIXME(jmhodges) move validate to this module?
+
+    class BinaryDecoder
+      # Read leaf values
+
+      # reader is an object on which we can call read, seek and tell.
+      attr_reader :reader
+      def initialize(reader)
+        @reader = reader
+      end
+
+      def byte!
+        @reader.read(1)[0]
+      end
+
+      def read_null
+        # null is written as zero byte's
+        nil
+      end
+
+      def read_boolean
+        byte! == 1
+      end
+
+      def read_int; read_long; end
+
+      def read_long
+        # int and long values are written using variable-length,
+        # zig-zag coding.
+        b = byte!
+        n = b & 0x7F
+        shift = 7
+        while (b & 0x80) != 0
+          b = byte!
+          n |= (b & 0x7F) << shift
+          shift += 7
+        end
+        (n >> 1) ^ -(n & 1)
+      end
+
+      def read_float
+        # A float is written as 4 bytes.
+        # The float is converted into a 32-bit integer using a method
+        # equivalent to Java's floatToIntBits and then encoded in
+        # little-endian format.
+
+        bits = (byte! & 0xFF) |
+          ((byte! & 0xff) <<  8) |
+          ((byte! & 0xff) << 16) |
+          ((byte! & 0xff) << 24)
+        [bits].pack('i').unpack('e')[0]
+      end
+
+      def read_double
+        #  A double is written as 8 bytes.
+        # The double is converted into a 64-bit integer using a method
+        # equivalent to Java's doubleToLongBits and then encoded in
+        # little-endian format.
+
+        bits = (byte! & 0xFF) |
+          ((byte! & 0xff) <<  8) |
+          ((byte! & 0xff) << 16) |
+          ((byte! & 0xff) << 24) |
+          ((byte! & 0xff) << 32) |
+          ((byte! & 0xff) << 40) |
+          ((byte! & 0xff) << 48) |
+          ((byte! & 0xff) << 56)
+        [bits].pack('Q').unpack('d')[0]
+      end
+
+      def read_bytes
+        # Bytes are encoded as a long followed by that many bytes of
+        # data.
+        read(read_long)
+      end
+
+      def read_string
+        # A string is encoded as a long followed by that many bytes of
+        # UTF-8 encoded character data.
+        # FIXME utf-8 encode this in 1.9
+        read_bytes
+      end
+
+      def read(len)
+        # Read n bytes
+        @reader.read(len)
+      end
+
+      def skip_null
+        nil
+      end
+
+      def skip_boolean
+        skip(1)
+      end
+
+      def skip_int
+        skip_long
+      end
+
+      def skip_long
+        b = byte!
+        while (b & 0x80) != 0
+          b = byte!
+        end
+      end
+
+      def skip_float
+        skip(4)
+      end
+
+      def skip_double
+        skip(8)
+      end
+
+      def skip_bytes
+        skip(read_long)
+      end
+
+      def skip_string
+        skip_bytes
+      end
+
+      def skip(n)
+        reader.seek(reader.tell() + n)
+      end
+    end
+
+    # Write leaf values
+    class BinaryEncoder
+      attr_reader :writer
+
+      def initialize(writer)
+        @writer = writer
+      end
+
+      # null is written as zero bytes
+      def write_null(datum)
+        nil
+      end
+
+      # a boolean is written as a single byte 
+      # whose value is either 0 (false) or 1 (true).
+      def write_boolean(datum)
+        on_disk = datum ? 1.chr : 0.chr
+        writer.write(on_disk)
+      end
+
+      # int and long values are written using variable-length,
+      # zig-zag coding.
+      def write_int(n)
+        write_long(n)
+      end
+
+      # int and long values are written using variable-length,
+      # zig-zag coding.
+      def write_long(n)
+        foo = n
+        n = (n << 1) ^ (n >> 63)
+        while (n & ~0x7F) != 0
+          @writer.write(((n & 0x7f) | 0x80).chr)
+          n >>= 7
+        end
+        @writer.write(n.chr)
+      end
+
+      # A float is written as 4 bytes.
+      # The float is converted into a 32-bit integer using a method
+      # equivalent to Java's floatToIntBits and then encoded in
+      # little-endian format.
+      def write_float(datum)
+        bits = [datum].pack('e').unpack('i')[0]
+        @writer.write(((bits      ) & 0xFF).chr)
+        @writer.write(((bits >> 8 ) & 0xFF).chr)
+        @writer.write(((bits >> 16) & 0xFF).chr)
+        @writer.write(((bits >> 24) & 0xFF).chr)
+      end
+
+      # A double is written as 8 bytes.
+      # The double is converted into a 64-bit integer using a method
+      # equivalent to Java's doubleToLongBits and then encoded in
+      # little-endian format.
+      def write_double(datum)
+        bits = [datum].pack('d').unpack('Q')[0]
+        @writer.write(((bits      ) & 0xFF).chr)
+        @writer.write(((bits >> 8 ) & 0xFF).chr)
+        @writer.write(((bits >> 16) & 0xFF).chr)
+        @writer.write(((bits >> 24) & 0xFF).chr)
+        @writer.write(((bits >> 32) & 0xFF).chr)
+        @writer.write(((bits >> 40) & 0xFF).chr)
+        @writer.write(((bits >> 48) & 0xFF).chr)
+        @writer.write(((bits >> 56) & 0xFF).chr)
+      end
+
+      # Bytes are encoded as a long followed by that many bytes of data.
+      def write_bytes(datum)
+        write_long(datum.size)
+        @writer.write(datum)
+      end
+
+      # A string is encoded as a long followed by that many bytes of
+      # UTF-8 encoded character data
+      def write_string(datum)
+        # FIXME utf-8 encode this in 1.9
+        write_bytes(datum)
+      end
+
+      # Write an arbritary datum.
+      def write(datum)
+        writer.write(datum)
+      end
+    end
+
+    class DatumReader
+      def self.check_props(schema_one, schema_two, prop_list)
+        prop_list.all? do |prop|
+          schema_one.to_hash[prop] == schema_two.to_hash[prop]
+        end
+      end
+
+      def self.match_schemas(writers_schema, readers_schema)
+        w_type = writers_schema.type
+        r_type = readers_schema.type
+
+        # This conditional is begging for some OO love.
+        if [w_type, r_type].include? 'union'
+          return true
+        elsif Schema::PRIMITIVE_TYPES.include?(w_type) &&
+              Schema::PRIMITIVE_TYPES.include?(r_type) &&
+            w_type == r_type
+          return true
+        elsif (w_type == r_type) && (r_type == 'record') &&
+            check_props(writers_schema, readers_schema, ['fullname'])
+          return true
+        elsif w_type == r_type && r_type == 'error' && check_props(writers_scheam, readers_schema, ['fullname'])
+          return true
+        elsif w_type == r_type && r_type == 'request'
+          return true
+        elsif (w_type == r_type) && (r_type == 'fixed') &&
+            check_props(writers_schema, readers_schema, ['fullname', 'size'])
+          return true
+        elsif (w_type == r_type) && (r_type == 'enum') &&
+            check_props(writers_schema, readers_schema, ['fullname'])
+          return true
+        elsif (w_type == r_type) && (r_type == 'map') &&
+            check_props(writers_schema.values, readers_schema.values, ['type'])
+          return true
+        elsif (w_type == r_type) && (r_type == 'array') &&
+            check_props(writers_schema.items, readers_schema.items, ['type'])
+          return true
+        end
+
+        # Handle schema promotion
+        if w_type == 'int' && ['long', 'float', 'double'].include?(r_type)
+          return true
+        elsif w_type == 'long' && ['float', 'double'].include?(r_type)
+          return true
+        elsif w_type == 'float' && r_type == 'double'
+          return true
+        end
+
+        return false
+      end
+
+      attr_accessor :writers_schema, :readers_schema
+
+      def initialize(writers_schema=nil, readers_schema=nil)
+        @writers_schema = writers_schema
+        @readers_schema = readers_schema
+      end
+
+      def read(decoder)
+        self.readers_schema = writers_schema unless readers_schema
+        read_data(writers_schema, readers_schema, decoder)
+      end
+
+      def read_data(writers_schema, readers_schema, decoder)
+        # schema matching
+        unless self.class.match_schemas(writers_schema, readers_schema)
+          raise SchemaMatchException.new(writers_schema, readers_schema)
+        end
+
+        # schema resolution: reader's schema is a union, writer's
+        # schema is not
+        if writers_schema.type != 'union' && readers_schema.type == 'union'
+          rs = readers_schema.schemas.find{|s|
+            self.class.match_schemas(writers_schema, s)
+          }
+          return read_data(writers_schema, rs, decoder) if rs
+          raise SchemaMatchException.new(writers_schema, readers_schema)
+        end
+
+        # function dispatch for reading data based on type of writer's
+        # schema
+        case writers_schema.type
+        when 'null';    decoder.read_null
+        when 'boolean'; decoder.read_boolean
+        when 'string';  decoder.read_string
+        when 'int';     decoder.read_int
+        when 'long';    decoder.read_long
+        when 'float';   decoder.read_float
+        when 'double';  decoder.read_double
+        when 'bytes';   decoder.read_bytes
+        when 'fixed';   read_fixed(writers_schema, readers_schema, decoder)
+        when 'enum';    read_enum(writers_schema, readers_schema, decoder)
+        when 'array';   read_array(writers_schema, readers_schema, decoder)
+        when 'map';     read_map(writers_schema, readers_schema, decoder)
+        when 'union';   read_union(writers_schema, readers_schema, decoder)
+        when 'record', 'errors', 'request';  read_record(writers_schema, readers_schema, decoder)
+        else
+          raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}"
+        end
+      end
+
+      def read_fixed(writers_schema, readers_schema, decoder)
+        decoder.read(writers_schema.size)
+      end
+
+      def read_enum(writers_schema, readers_schema, decoder)
+        index_of_symbol = decoder.read_int
+        read_symbol = writers_schema.symbols[index_of_symbol]
+
+        # TODO(jmhodges): figure out what unset means for resolution
+        # schema resolution
+        unless readers_schema.symbols.include?(read_symbol)
+          # 'unset' here
+        end
+
+        read_symbol
+      end
+
+      def read_array(writers_schema, readers_schema, decoder)
+        read_items = []
+        block_count = decoder.read_long
+        while block_count != 0
+          if block_count < 0
+            block_count = -block_count
+            block_size = decoder.read_long
+          end
+          block_count.times do
+            read_items << read_data(writers_schema.items,
+                                    readers_schema.items,
+                                    decoder)
+          end
+          block_count = decoder.read_long
+        end
+
+        read_items
+      end
+
+      def read_map(writers_schema, readers_schema, decoder)
+        read_items = {}
+        block_count = decoder.read_long
+        while block_count != 0
+          if block_count < 0
+            block_count = -block_count
+            block_size = decoder.read_long
+          end
+          block_count.times do
+            key = decoder.read_string
+            read_items[key] = read_data(writers_schema.values,
+                                        readers_schema.values,
+                                        decoder)
+          end
+          block_count = decoder.read_long
+        end
+
+        read_items
+      end
+
+      def read_union(writers_schema, readers_schema, decoder)
+        index_of_schema = decoder.read_long
+        selected_writers_schema = writers_schema.schemas[index_of_schema]
+
+        read_data(selected_writers_schema, readers_schema, decoder)
+      end
+
+      def read_record(writers_schema, readers_schema, decoder)
+        readers_fields_hash = readers_schema.fields_hash
+        read_record = {}
+        writers_schema.fields.each do |field|
+          if readers_field = readers_fields_hash[field.name]
+            field_val = read_data(field.type, readers_field.type, decoder)
+            read_record[field.name] = field_val
+          else
+            skip_data(field.type, decoder)
+          end
+        end
+
+        # fill in the default values
+        if readers_fields_hash.size > read_record.size
+          writers_fields_hash = writers_schema.fields_hash
+          readers_fields_hash.each do |field_name, field|
+            
+            unless writers_fields_hash.has_key? field_name
+              if !field.default.nil?
+                field_val = read_default_value(field.type, field.default)
+                read_record[field.name] = field_val
+              else
+                # FIXME(jmhodges) another 'unset' here
+              end
+            end
+          end
+        end
+
+        read_record
+      end
+
+      def read_default_value(field_schema, default_value)
+        # Basically a JSON Decoder?
+        case field_schema.type
+        when 'null'
+          return nil
+        when 'boolean'
+          return default_value
+        when 'int', 'long'
+          return Integer(default_value)
+        when 'float', 'double'
+          return Float(default_value)
+        when 'enum', 'fixed', 'string', 'bytes'
+          return default_value
+        when 'array'
+          read_array = []
+          default_value.each do |json_val|
+            item_val = read_default_value(field_schema.items, json_val)
+            read_array << item_val
+          end
+          return read_array
+        when 'map'
+          read_map = {}
+          default_value.each do |key, json_val|
+            map_val = read_default_value(field_schema.values, json_val)
+            read_map[key] = map_val
+          end
+          return read_map
+        when 'union'
+          return read_default_value(field_schema.schemas[0], default_value)
+        when 'record'
+          read_record = {}
+          field_schema.fields.each do |field|
+            json_val = default_value[field.name]
+            json_val = field.default unless json_val
+            field_val = read_default_value(field.type, json_val)
+            read_record[field.name] = field_val
+          end
+          return read_record
+        else
+          fail_msg = "Unknown type: #{field_schema.type}"
+          raise AvroError(fail_msg)
+        end
+      end
+    end # DatumReader
+
+    # DatumWriter for generic ruby objects
+    class DatumWriter
+      attr_accessor :writers_schema
+      def initialize(writers_schema=nil)
+        @writers_schema = writers_schema
+      end
+
+      def write(datum, encoder)
+        write_data(writers_schema, datum, encoder)
+      end
+
+      def write_data(writers_schema, datum, encoder)
+        unless Schema.validate(writers_schema, datum)
+          raise AvroTypeError.new(writers_schema, datum)
+        end
+
+        # function dispatch to write datum
+        case writers_schema.type
+        when 'null';    encoder.write_null(datum)
+        when 'boolean'; encoder.write_boolean(datum)
+        when 'string';  encoder.write_string(datum)
+        when 'int';     encoder.write_int(datum)
+        when 'long';    encoder.write_long(datum)
+        when 'float';   encoder.write_float(datum)
+        when 'double';  encoder.write_double(datum)
+        when 'bytes';   encoder.write_bytes(datum)
+        when 'fixed';   write_fixed(writers_schema, datum, encoder)
+        when 'enum';    write_enum(writers_schema, datum, encoder)
+        when 'array';   write_array(writers_schema, datum, encoder)
+        when 'map';     write_map(writers_schema, datum, encoder)
+        when 'union';   write_union(writers_schema, datum, encoder)
+        when 'record', 'errors', 'request';  write_record(writers_schema, datum, encoder)
+        else
+          raise AvroError.new("Unknown type: #{writers_schema.type}")
+        end
+      end
+
+      def write_fixed(writers_schema, datum, encoder)
+        encoder.write(datum)
+      end
+
+      def write_enum(writers_schema, datum, encoder)
+        index_of_datum = writers_schema.symbols.index(datum)
+        encoder.write_int(index_of_datum)
+      end
+
+      def write_array(writers_schema, datum, encoder)
+        if datum.size > 0
+          encoder.write_long(datum.size)
+          datum.each do |item|
+            write_data(writers_schema.items, item, encoder)
+          end
+        end
+        encoder.write_long(0)
+      end
+
+      def write_map(writers_schema, datum, encoder)
+        if datum.size > 0
+          encoder.write_long(datum.size)
+          datum.each do |k,v|
+            encoder.write_string(k)
+            write_data(writers_schema.values, v, encoder)
+          end
+        end
+        encoder.write_long(0)
+      end
+
+      def write_union(writers_schema, datum, encoder)
+        index_of_schema = writers_schema.schemas.
+          find_index{|e| Schema.validate(e, datum) }
+        unless index_of_schema
+          raise AvroTypeError.new(writers_schema, datum)
+        end
+        encoder.write_long(index_of_schema)
+        write_data(writers_schema.schemas[index_of_schema], datum, encoder)
+      end
+
+      def write_record(writers_schema, datum, encoder)
+        writers_schema.fields.each do |field|
+          write_data(field.type, datum[field.name], encoder)
+        end
+      end
+    end # DatumWriter
+  end
+end

Added: hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb (added)
+++ hadoop/avro/trunk/lang/ruby/lib/avro/ipc.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,443 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+require 'stringio'
+
+module Avro::IPC
+
+  class AvroRemoteError < Avro::AvroError; end
+
+  HANDSHAKE_REQUEST_SCHEMA = Avro::Schema.parse <<-JSON
+  {
+    "type": "record",
+    "name": "HandshakeRequest", "namespace":"org.apache.avro.ipc",
+    "fields": [
+      {"name": "clientHash",
+       "type": {"type": "fixed", "name": "MD5", "size": 16}},
+      {"name": "clientProtocol", "type": ["null", "string"]},
+      {"name": "serverHash", "type": "MD5"},
+      {"name": "meta", "type": ["null", {"type": "map", "values": "bytes"}]}
+    ]
+  }
+  JSON
+
+  HANDSHAKE_RESPONSE_SCHEMA = Avro::Schema.parse <<-JSON
+  {
+    "type": "record",
+    "name": "HandshakeResponse", "namespace": "org.apache.avro.ipc",
+    "fields": [
+      {"name": "match",
+       "type": {"type": "enum", "name": "HandshakeMatch",
+                "symbols": ["BOTH", "CLIENT", "NONE"]}},
+      {"name": "serverProtocol", "type": ["null", "string"]},
+      {"name": "serverHash",
+       "type": ["null", {"type": "fixed", "name": "MD5", "size": 16}]},
+      {"name": "meta",
+       "type": ["null", {"type": "map", "values": "bytes"}]}
+    ]
+  }
+  JSON
+
+  HANDSHAKE_REQUESTOR_WRITER = Avro::IO::DatumWriter.new(HANDSHAKE_REQUEST_SCHEMA)
+  HANDSHAKE_REQUESTOR_READER = Avro::IO::DatumReader.new(HANDSHAKE_RESPONSE_SCHEMA)
+  HANDSHAKE_RESPONDER_WRITER = Avro::IO::DatumWriter.new(HANDSHAKE_RESPONSE_SCHEMA)
+  HANDSHAKE_RESPONDER_READER = Avro::IO::DatumReader.new(HANDSHAKE_REQUEST_SCHEMA)
+
+  META_SCHEMA = Avro::Schema.parse('{"type": "map", "values": "bytes"}')
+  META_WRITER = Avro::IO::DatumWriter.new(META_SCHEMA)
+  META_READER = Avro::IO::DatumReader.new(META_SCHEMA)
+
+  SYSTEM_ERROR_SCHEMA = Avro::Schema.parse('["string"]')
+
+  # protocol cache
+  REMOTE_HASHES = {}
+  REMOTE_PROTOCOLS = {}
+
+  BUFFER_HEADER_LENGTH = 4
+  BUFFER_SIZE = 8192
+
+  # Raised when an error message is sent by an Avro requestor or responder.
+  class AvroRemoteException < Avro::AvroError; end
+
+  class ConnectionClosedException < Avro::AvroError; end
+
+  class Requestor
+    """Base class for the client side of a protocol interaction."""
+    attr_reader :local_protocol, :transport
+    attr_accessor :remote_protocol, :remote_hash, :send_protocol
+
+    def initialize(local_protocol, transport)
+      @local_protocol = local_protocol
+      @transport = transport
+      @remote_protocol = nil
+      @remote_hash = nil
+      @send_protocol = nil
+    end
+
+    def remote_protocol=(new_remote_protocol)
+      @remote_protocol = new_remote_protocol
+      REMOTE_PROTOCOLS[transport.remote_name] = remote_protocol
+    end
+
+    def remote_hash=(new_remote_hash)
+      @remote_hash = new_remote_hash
+      REMOTE_HASHES[transport.remote_name] = remote_hash
+    end
+
+    def request(message_name, request_datum)
+      # Writes a request message and reads a response or error message.
+      # build handshake and call request
+      buffer_writer = StringIO.new('', 'w+')
+      buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer)
+      write_handshake_request(buffer_encoder)
+      write_call_request(message_name, request_datum, buffer_encoder)
+
+      # send the handshake and call request;  block until call response
+      call_request = buffer_writer.string
+      call_response = transport.transceive(call_request)
+
+      # process the handshake and call response
+      buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_response))
+      if read_handshake_response(buffer_decoder)
+        read_call_response(message_name, buffer_decoder)
+      else
+        request(message_name, request_datum)
+      end
+    end
+
+    def write_handshake_request(encoder)
+      local_hash = local_protocol.md5
+      remote_name = transport.remote_name
+      remote_hash = REMOTE_HASHES[remote_name]
+      unless remote_hash
+        remote_hash = local_hash
+        self.remote_protocol = local_protocol
+      end
+      request_datum = {
+        'clientHash' => local_hash,
+        'serverHash' => remote_hash
+      }
+      if send_protocol
+        request_datum['clientProtocol'] = local_protocol.to_s
+      end
+      HANDSHAKE_REQUESTOR_WRITER.write(request_datum, encoder)
+    end
+
+    def write_call_request(message_name, request_datum, encoder)
+      # The format of a call request is:
+      #   * request metadata, a map with values of type bytes
+      #   * the message name, an Avro string, followed by
+      #   * the message parameters. Parameters are serialized according to
+      #     the message's request declaration.
+
+      # TODO request metadata (not yet implemented)
+      request_metadata = {}
+      META_WRITER.write(request_metadata, encoder)
+
+      message = local_protocol.messages[message_name]
+      unless message
+        raise AvroError, "Unknown message: #{message_name}"
+      end
+      encoder.write_string(message.name)
+
+      write_request(message.request, request_datum, encoder)
+    end
+
+    def write_request(request_schema, request_datum, encoder)
+      datum_writer = Avro::IO::DatumWriter.new(request_schema)
+      datum_writer.write(request_datum, encoder)
+    end
+
+    def read_handshake_response(decoder)
+      handshake_response = HANDSHAKE_REQUESTOR_READER.read(decoder)
+      case match = handshake_response['match']
+      when 'BOTH'
+        self.send_protocol = false
+        true
+      when 'CLIENT'
+        raise AvroError.new('Handshake failure. match == CLIENT') if send_protocol
+        self.remote_protocol = handshake_response['serverProtocol']
+        self.remote_hash = handshake_response['serverHash']
+        self.send_protocol = false
+        false
+      when 'NONE'
+        raise AvroError.new('Handshake failure. match == NONE') if send_protocol
+        self.remote_protocol = handshake_response['serverProtocol']
+        self.remote_hash = handshake_response['serverHash']
+        self.send_protocol = true
+        false
+      else
+        raise AvroError.new("Unexpected match: #{match}")
+      end
+    end
+
+    def read_call_response(message_name, decoder)
+      # The format of a call response is:
+      #   * response metadata, a map with values of type bytes
+      #   * a one-byte error flag boolean, followed by either:
+      #     * if the error flag is false,
+      #       the message response, serialized per the message's response schema.
+      #     * if the error flag is true, 
+      #       the error, serialized per the message's error union schema.
+      response_metadata = META_READER.read(decoder)
+
+      # remote response schema
+      remote_message_schema = remote_protocol.messages[message_name]
+      raise AvroError.new("Unknown remote message: #{message_name}") unless remote_message_schema
+
+      # local response schema
+      local_message_schema = local_protocol.messages[message_name]
+      unless local_message_schema
+        raise AvroError.new("Unknown local message: #{message_name}")
+      end
+
+      # error flag
+      if !decoder.read_boolean
+        writers_schema = remote_message_schema.response
+        readers_schema = local_message_schema.response
+        read_response(writers_schema, readers_schema, decoder)
+      else
+        writers_schema = remote_message_schema.errors || SYSTEM_ERROR_SCHEMA
+        readers_schema = local_message_schema.errors || SYSTEM_ERROR_SCHEMA
+        raise read_error(writers_schema, readers_schema, decoder)
+      end
+    end
+
+    def read_response(writers_schema, readers_schema, decoder)
+      datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
+      datum_reader.read(decoder)
+    end
+
+    def read_error(writers_schema, readers_schema, decoder)
+      datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
+      AvroRemoteError.new(datum_reader.read(decoder))
+    end
+  end
+
+  # Base class for the server side of a protocol interaction.
+  class Responder
+    attr_reader :local_protocol, :local_hash, :protocol_cache
+    def initialize(local_protocol)
+      @local_protocol = local_protocol
+      @local_hash = self.local_protocol.md5
+      @protocol_cache = {}
+      protocol_cache[local_hash] = local_protocol
+    end
+
+    def respond(transport)
+      # Called by a server to deserialize a request, compute and serialize
+      # a response or error. Compare to 'handle()' in Thrift.
+
+      call_request = transport.read_framed_message
+      buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request))
+      buffer_writer = StringIO.new('', 'w+')
+      buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer)
+      error = nil
+      response_metadata = {}
+
+      begin
+        remote_protocol = process_handshake(transport, buffer_decoder, buffer_encoder)
+        # handshake failure
+        unless remote_protocol
+          return buffer_writer.string
+        end
+
+        # read request using remote protocol
+        request_metadata = META_READER.read(buffer_decoder)
+        remote_message_name = buffer_decoder.read_string
+
+        # get remote and local request schemas so we can do
+        # schema resolution (one fine day)
+        remote_message = remote_protocol.messages[remote_message_name]
+        unless remote_message
+          raise AvroError.new("Unknown remote message: #{remote_message_name}")
+        end
+        local_message = local_protocol.messages[remote_message_name]
+        unless local_message
+          raise AvroError.new("Unknown local message: #{remote_message_name}")
+        end
+        writers_schema = remote_message.request
+        readers_schema = local_message.request
+        request = read_request(writers_schema, readers_schema, buffer_decoder)
+        # perform server logic
+        begin
+          response = call(local_message, request)
+        rescue AvroRemoteError => e
+          error = e
+        rescue Exception => e
+          error = AvroRemoteError.new(e.to_s)
+        end
+
+        # write response using local protocol
+        META_WRITER.write(response_metadata, buffer_encoder)
+        buffer_encoder.write_boolean(!!error)
+        if error.nil?
+          writers_schema = local_message.response
+          write_response(writers_schema, response, buffer_encoder)
+        else
+          writers_schema = local_message.errors || SYSTEM_ERROR_SCHEMA
+          write_error(writers_schema, error, buffer_encoder)
+        end
+      rescue Avro::AvroError => e
+        error = AvroRemoteException.new(e.to_s)
+        buffer_encoder = Avro::IO::BinaryEncoder.new(StringIO.new)
+        META_WRITER.write(response_metadata, buffer_encoder)
+        buffer_encoder.write_boolean(true)
+        self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder)
+      end
+      buffer_writer.string
+    end
+
+    def process_handshake(transport, decoder, encoder)
+      handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder)
+      handshake_response = {}
+
+      # determine the remote protocol
+      client_hash = handshake_request['clientHash']
+      client_protocol = handshake_request['clientProtocol']
+      remote_protocol = protocol_cache[client_hash]
+      if !remote_protocol && client_protocol
+        remote_protocol = protocol.parse(client_protocol)
+        protocol_cache[client_hash] = remote_protocol
+      end
+
+      # evaluate remote's guess of the local protocol
+      server_hash = handshake_request['serverHash']
+      if local_hash == server_hash
+        if !remote_protocol
+          handshake_response['match'] = 'NONE'
+        else
+          handshake_response['match'] = 'BOTH'
+        end
+      else
+        if !remote_protocol
+          handshake_response['match'] = 'NONE'
+        else
+          handshake_response['match'] = 'CLIENT'
+        end
+      end
+
+      if handshake_response['match'] != 'BOTH'
+        handshake_response['serverProtocol'] = local_protocol.to_s
+        handshake_response['serverHash'] = local_hash
+      end
+
+      HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder)
+      remote_protocol
+    end
+
+    def call(local_message, request)
+      # Actual work done by server: cf. handler in thrift.
+      raise NotImplementedError
+    end
+
+    def read_request(writers_schema, readers_schema, decoder)
+      datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
+      datum_reader.read(decoder)
+    end
+
+    def write_response(writers_schema, response_datum, encoder)
+      datum_writer = Avro::IO::DatumWriter.new(writers_schema)
+      datum_writer.write(response_datum, encoder)
+    end
+
+    def write_error(writers_schema, error_exception, encoder)
+      datum_writer = Avro::IO::DatumWriter.new(writers_schema)
+      datum_writer.write(error_exception.to_s, encoder)
+    end
+  end
+
+  class SocketTransport
+    # A simple socket-based Transport implementation.
+
+    attr_reader :sock, :remote_name
+
+    def initialize(sock)
+      @sock = sock
+    end
+
+    def transceive(request)
+      write_framed_message(request)
+      read_framed_message
+    end
+
+    def read_framed_message
+      message = []
+      loop do
+        buffer = StringIO.new
+        buffer_length = read_buffer_length
+        if buffer_length == 0
+          return message.join
+        end
+        while buffer.tell < buffer_length
+          chunk = sock.read(buffer_length - buffer.tell)
+          if chunk == ''
+            raise ConnectionClosedException.new("Socket read 0 bytes.")
+          end
+          buffer.write(chunk)
+        end
+        message << buffer.string
+      end
+    end
+
+    def write_framed_message(message)
+      message_length = message.size
+      total_bytes_sent = 0
+      while message_length - total_bytes_sent > 0
+        if message_length - total_bytes_sent > BUFFER_SIZE:
+          buffer_length = BUFFER_SIZE
+        else
+          buffer_length = message_length - total_bytes_sent
+        end
+        write_buffer(message[total_bytes_sent,buffer_length])
+        total_bytes_sent += buffer_length
+      end
+      # A message is always terminated by a zero-length buffer.
+      write_buffer_length(0)
+    end
+
+    def write_buffer(chunk)
+      buffer_length = chunk.size
+      write_buffer_length(buffer_length)
+      total_bytes_sent = 0
+      while total_bytes_sent < buffer_length
+        bytes_sent = self.sock.write(chunk[total_bytes_sent..-1])
+        if bytes_sent == 0
+          raise ConnectionClosedException.new("Socket sent 0 bytes.")
+        end
+        total_bytes_sent += bytes_sent
+      end
+    end
+
+    def write_buffer_length(n)
+      bytes_sent = sock.write([n].pack('I'))
+      if bytes_sent == 0
+        raise ConnectionClosedException.new("socket sent 0 bytes")
+      end
+    end
+
+    def read_buffer_length
+      read = sock.read(BUFFER_HEADER_LENGTH)
+      if read == '' || read == nil
+        raise ConnectionClosedException.new("Socket read 0 bytes.")
+      end
+      read.unpack('I')[0]
+    end
+
+    def close
+      sock.close
+    end
+  end
+end

Added: hadoop/avro/trunk/lang/ruby/lib/avro/protocol.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro/protocol.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/lib/avro/protocol.rb (added)
+++ hadoop/avro/trunk/lang/ruby/lib/avro/protocol.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,160 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+module Avro
+  class Protocol
+    VALID_TYPE_SCHEMA_TYPES = Set.new(%w[enum record error fixed])
+    class ProtocolParseError < Avro::AvroError; end
+
+    attr_reader :name, :namespace, :types, :messages, :md5
+    def self.parse(protocol_string)
+      json_data = Yajl.load(protocol_string)
+
+      if json_data.is_a? Hash
+        name = json_data['protocol']
+        namespace = json_data['namespace']
+        types = json_data['types']
+        messages = json_data['messages']
+        Protocol.new(name, namespace, types, messages)
+      else
+        raise ProtocolParseError, "Not a JSON object: #{json_data}"
+      end
+    end
+
+    def initialize(name, namespace=nil, types=nil, messages=nil)
+      # Ensure valid ctor args
+      if !name
+        raise ProtocolParseError, 'Protocols must have a non-empty name.'
+      elsif !name.is_a?(String)
+        raise ProtocolParseError, 'The name property must be a string.'
+      elsif !namespace.is_a?(String)
+        raise ProtocolParseError, 'The namespace property must be a string.'
+      elsif !types.is_a?(Array)
+        raise ProtocolParseError, 'The types property must be a list.'
+      elsif !messages.is_a?(Hash)
+        raise ProtocolParseError, 'The messages property must be a JSON object.'
+      end
+
+      @name = name
+      @namespace = namespace
+      type_names = {}
+      @types = parse_types(types, type_names)
+      @messages = parse_messages(messages, type_names)
+      @md5 = Digest::MD5.digest(to_s)
+    end
+
+    def to_s
+      Yajl.dump to_hash
+    end
+
+    def ==(other)
+      to_hash == Yajl.load(other.to_s)
+    end
+
+    private
+    def parse_types(types, type_names)
+      type_objects = []
+      types.collect do |type|
+        # FIXME adding type.name to type_names is not defined in the
+        # spec. Possible bug in the python impl and the spec.
+        type_object = Schema.real_parse(type, type_names)
+        unless VALID_TYPE_SCHEMA_TYPES.include?(type_object.type)
+          msg = "Type #{type} not an enum, record, fixed or error."
+          raise ProtocolParseError, msg
+        end
+        type_object
+      end
+    end
+
+    def parse_messages(messages, names)
+      message_objects = {}
+      messages.each do |name, body|
+        if message_objects.has_key?(name)
+          raise ProtocolParseError, "Message name \"#{name}\" repeated."
+        elsif !body.is_a?(Hash)
+          raise ProtocolParseError, "Message name \"#{name}\" has non-object body #{body.inspect}"
+        end
+
+        request  = body['request']
+        response = body['response']
+        errors   = body['errors']
+        message_objects[name] = Message.new(name, request, response, errors, names)
+      end
+      message_objects
+    end
+
+    def to_hash
+      hsh = {'protocol' => name}
+      hsh['namespace'] = namespace if namespace
+      hsh['types'] = types.map{|t| Yajl.load(t.to_s) } if types
+
+      if messages
+        hsh['messages'] = messages.collect_hash{|k,t| [k, Yajl.load(t.to_s)] }
+      end
+
+      hsh
+    end
+
+    class Message
+      attr_reader :name, :response_from_names, :request, :response, :errors
+      def initialize(name, request, response, errors=nil, names=nil)
+        @name = name
+        @response_from_names = false
+
+        @request = parse_request(request, names)
+        @response = parse_response(response, names)
+        @errors = parse_errors(errors, names) if errors
+      end
+
+      def to_s
+        hsh = {'request' => Yajl.load(request.to_s)}
+        if response_from_names
+          hsh['response'] = response.fullname
+        else
+          hsh['response'] = Yajl.load(response.to_s)
+        end
+
+        if errors
+          hsh['errors'] = Yajl.load(errors.to_s)
+        end
+        Yajl.dump hsh
+      end
+
+      def parse_request(request, names)
+        unless request.is_a?(Array)
+          raise ProtocolParseError, "Request property not an Array: #{request.inspect}"
+        end
+        Schema::RecordSchema.new(nil, nil, request, names, 'request')
+      end
+
+      def parse_response(response, names)
+        if response.is_a?(String) && names[response]
+          @response_from_names = true
+          names[response]
+        else
+          Schema.real_parse(response, names)
+        end
+      end
+
+      def parse_errors(errors, names)
+        unless errors.is_a?(Array)
+          raise ProtocolParseError, "Errors property not an Array: #{errors}"
+        end
+        Schema.real_parse(errors, names)
+      end
+    end
+  end
+end

Added: hadoop/avro/trunk/lang/ruby/lib/avro/schema.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/lib/avro/schema.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/lib/avro/schema.rb (added)
+++ hadoop/avro/trunk/lang/ruby/lib/avro/schema.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,431 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+module Avro
+  class Schema
+    # FIXME turn these into symbols to prevent some gc pressure
+    PRIMITIVE_TYPES = Set.new(%w[null boolean string bytes int long float double])
+    NAMED_TYPES =     Set.new(%w[fixed enum record error])
+
+    VALID_TYPES = PRIMITIVE_TYPES + NAMED_TYPES + Set.new(%w[array map union request])
+
+    INT_MIN_VALUE = -(1 << 31)
+    INT_MAX_VALUE = (1 << 31) - 1
+    LONG_MIN_VALUE = -(1 << 63)
+    LONG_MAX_VALUE = (1 << 63) - 1
+
+    def self.parse(json_string)
+      real_parse(Yajl.load(json_string), {})
+    end
+
+    # Build Avro Schema from data parsed out of JSON string.
+    def self.real_parse(json_obj, names=nil)
+      if json_obj.is_a? Hash
+        type = json_obj['type']
+        if PRIMITIVE_TYPES.include?(type)
+          return PrimitiveSchema.new(type)
+        elsif NAMED_TYPES.include? type
+          name = json_obj['name']
+          namespace = json_obj['namespace']
+          case type
+          when 'fixed'
+            size = json_obj['size']
+            return FixedSchema.new(name, namespace, size, names)
+          when 'enum'
+            symbols = json_obj['symbols']
+            return EnumSchema.new(name, namespace, symbols, names)
+          when 'record', 'error'
+            fields = json_obj['fields']
+            return RecordSchema.new(name, namespace, fields, names, type)
+          else
+            raise SchemaParseError.new("Unknown Named Type: #{type}")
+          end
+        elsif VALID_TYPES.include?(type)
+          case type
+          when 'array'
+            return ArraySchema.new(json_obj['items'], names)
+          when 'map'
+            return MapSchema.new(json_obj['values'], names)
+          else
+            raise SchemaParseError.new("Unknown Valid Type: #{type}")
+          end
+        elsif type.nil?
+          raise SchemaParseError.new("No \"type\" property: #{json_obj}")
+        else
+          raise SchemaParseError.new("Undefined type: #{type}")
+        end
+      elsif json_obj.is_a? Array
+        # JSON array (union)
+        return UnionSchema.new(json_obj, names)
+      elsif PRIMITIVE_TYPES.include? json_obj
+        return PrimitiveSchema.new(json_obj)
+      else
+        msg = "Could not make an Avro Schema object from #{json_obj}"
+        raise SchemaParseError.new(msg)
+      end
+    end
+
+    # Determine if a ruby datum is an instance of a schema
+    def self.validate(expected_schema, datum)
+      case expected_schema.type
+      when 'null'
+        datum.nil?
+      when 'boolean'
+        datum == true || datum == false
+      when 'string', 'bytes'
+        datum.is_a? String
+      when 'int'
+        (datum.is_a?(Fixnum) || datum.is_a?(Bignum)) &&
+            (INT_MIN_VALUE <= datum) && (datum <= INT_MAX_VALUE)
+      when 'long'
+        (datum.is_a?(Fixnum) || datum.is_a?(Bignum)) &&
+            (LONG_MIN_VALUE <= datum) && (datum <= LONG_MAX_VALUE)
+      when 'float', 'double'
+        datum.is_a?(Float) || datum.is_a?(Fixnum) || datum.is_a?(Bignum)
+      when 'fixed'
+        datum.is_a?(String) && datum.size == expected_schema.size
+      when 'enum'
+        expected_schema.symbols.include? datum
+      when 'array'
+        datum.is_a?(Array) &&
+          datum.all?{|d| validate(expected_schema.items, d) }
+      when 'map':
+          datum.keys.all?{|k| k.is_a? String } &&
+          datum.values.all?{|v| validate(expected_schema.values, v) }
+      when 'union'
+        expected_schema.schemas.any?{|s| validate(s, datum) }
+      when 'record', 'error', 'request'
+        datum.is_a?(Hash) &&
+          expected_schema.fields.all?{|f| validate(f.type, datum[f.name]) }
+      else
+        raise "you suck #{expected_schema.inspect} is not allowed."
+      end
+    end
+
+    def initialize(type)
+      @type = type
+    end
+
+    def type; @type; end
+
+    def ==(other, seen=nil)
+      other.is_a?(Schema) && @type == other.type
+    end
+
+    def hash(seen=nil)
+      @type.hash
+    end
+
+    def to_hash
+      {'type' => @type}
+    end
+
+    def to_s
+      Yajl.dump to_hash
+    end
+
+    class NamedSchema < Schema
+      attr_reader :name, :namespace
+      def initialize(type, name, namespace=nil, names=nil)
+        super(type)
+        @name, @namespace = Name.extract_namespace(name, namespace)
+        names = Name.add_name(names, self)
+      end
+
+      def to_hash
+        props = {'name' => @name}
+        props.merge!('namespace' => @namespace) if @namespace
+        super.merge props
+      end
+
+      def fullname
+        Name.make_fullname(@name, @namespace)
+      end
+    end
+
+    class RecordSchema < NamedSchema
+      attr_reader :fields
+
+      def self.make_field_objects(field_data, names)
+        field_objects, field_names = [], Set.new
+        field_data.each_with_index do |field, i|
+          if field.respond_to?(:[]) # TODO(jmhodges) wtffffff
+            type = field['type']
+            name = field['name']
+            default = field['default']
+            order = field['order']
+            new_field = Field.new(type, name, default, order, names)
+            # make sure field name has not been used yet
+            if field_names.include?(new_field.name)
+              raise SchemaParseError, "Field name #{new_field.name.inspect} is already in use"
+            end
+            field_names << new_field.name
+          else
+            raise SchemaParseError, "Not a valid field: #{field}"
+          end
+          field_objects << new_field
+        end
+        field_objects
+      end
+
+      def initialize(name, namespace, fields, names=nil, schema_type='record')
+        if schema_type == 'request'
+          @type = schema_type
+        else
+          super(schema_type, name, namespace, names)
+        end
+        @fields = RecordSchema.make_field_objects(fields, names)
+      end
+
+      def fields_hash
+        fields.inject({}){|hsh, field| hsh[field.name] = field; hsh }
+      end
+
+      def to_hash
+        hsh = super.merge('fields' => @fields.map {|f|Yajl.load(f.to_s)} )
+        if type == 'request'
+          hsh['fields']
+        else
+          hsh
+        end
+      end
+    end
+
+    class ArraySchema < Schema
+      attr_reader :items, :items_schema_from_names
+      def initialize(items, names=nil)
+        @items_schema_from_names = false
+
+        super('array')
+
+        if items.is_a?(String) && names.has_key?(items)
+          @items = names[items]
+          @items_schema_from_names = true
+        else
+          begin
+            @items = Schema.real_parse(items, names)
+          rescue => e
+            msg = "Items schema not a valid Avro schema" + e.to_s
+            raise SchemaParseError, msg
+          end
+        end
+      end
+
+      def to_hash
+        name_or_json = if items_schema_from_names
+                         items.fullname
+                       else
+                         Yajl.load(items.to_s)
+                       end
+        super.merge('items' => name_or_json)
+      end
+    end
+
+    class MapSchema < Schema
+      attr_reader :values, :values_schema_from_names
+
+      def initialize(values, names=nil)
+        @values_schema_from_names = false
+        super('map')
+        if values.is_a?(String) && names.has_key?(values)
+          values_schema = names[values]
+          @values_schema_from_names = true
+        else
+          begin
+            values_schema = Schema.real_parse(values, names)
+          rescue => e
+            raise SchemaParseError.new('Values schema not a valid Avro schema.' + e.to_s)
+          end
+        end
+        @values = values_schema
+      end
+
+      def to_hash
+        to_dump = super
+        if values_schema_from_names
+          to_dump['values'] = values
+        else
+          to_dump['values'] = Yajl.load(values.to_s)
+        end
+        to_dump
+      end
+    end
+
+    class UnionSchema < Schema
+      attr_reader :schemas, :schema_from_names_indices
+      def initialize(schemas, names=nil)
+        super('union')
+
+        schema_objects = []
+        @schema_from_names_indices = []
+        schemas.each_with_index do |schema, i|
+          from_names = false
+          if schema.is_a?(String) && names.has_key?(schema)
+            new_schema = names[schema]
+            from_names = true
+          else
+            begin
+              new_schema = Schema.real_parse(schema, names)
+            rescue
+              raise SchemaParseError, 'Union item must be a valid Avro schema'
+            end
+          end
+
+          ns_type = new_schema.type
+          if VALID_TYPES.include?(ns_type) &&
+              !NAMED_TYPES.include?(ns_type) &&
+              schema_objects.map(&:type).include?(ns_type)
+            raise SchemaParseError, "#{ns_type} is already in Union"
+          elsif ns_type == 'union'
+            raise SchemaParseError, "Unions cannot contain other unions"
+          else
+            schema_objects << new_schema
+            @schema_from_names_indices << i if from_names
+          end
+          @schemas = schema_objects
+        end
+      end
+
+      def to_s
+        # FIXME(jmhodges) this from_name pattern is really weird and
+        # seems code-smelly.
+        to_dump = []
+        schemas.each_with_index do |schema, i|
+          if schema_from_names_indices.include?(i)
+            to_dump << schema.fullname
+          else
+            to_dump << Yajl.load(schema.to_s)
+          end
+        end
+        Yajl.dump(to_dump)
+      end
+    end
+
+    class EnumSchema < NamedSchema
+      attr_reader :symbols
+      def initialize(name, space, symbols, names=nil)
+        if symbols.uniq.length < symbols.length
+          fail_msg = 'Duplicate symbol: %s' % symbols
+          raise Avro::SchemaParseError, fail_msg
+        end
+        super('enum', name, space, names)
+        @symbols = symbols
+      end
+
+      def to_hash
+        super.merge('symbols' => symbols)
+      end
+    end
+
+    # Valid primitive types are in PRIMITIVE_TYPES.
+    class PrimitiveSchema < Schema
+      def initialize(type)
+        unless PRIMITIVE_TYPES.include? type
+          raise AvroError.new("#{type} is not a valid primitive type.")
+        end
+
+        super(type)
+      end
+
+      def to_s
+        to_hash.size == 1 ? type.inspect : Yajl.dump(to_hash)
+      end
+    end
+
+    class FixedSchema < NamedSchema
+      attr_reader :size
+      def initialize(name, space, size, names=nil)
+        # Ensure valid cto args
+        unless size.is_a?(Fixnum) || size.is_a?(Bignum)
+          raise AvroError, 'Fixed Schema requires a valid integer for size property.'
+        end
+        super('fixed', name, space, names)
+        @size = size
+      end
+
+      def to_hash
+        super.merge('size' => @size)
+      end
+    end
+
+    class Field
+      attr_reader :type, :name, :default, :order, :type_from_names
+      def initialize(type, name, default=nil, order=nil, names=nil)
+        @type_from_names = false
+        if type.is_a?(String) && names && names.has_key?(type)
+          type_schema = names[type]
+          @type_from_names = true
+        else
+          type_schema = Schema.real_parse(type, names)
+        end
+        @type = type_schema
+        @name = name
+        @default = default
+        @order = order
+      end
+
+      def to_hash
+        sigh_type = type_from_names ? type.fullname : Yajl.load(type.to_s)
+        hsh = {
+          'name' => name,
+          'type' => sigh_type
+        }
+        hsh['default'] = default if default
+        hsh['order'] = order if order
+        hsh
+      end
+
+      def to_s
+        Yajl.dump(to_hash)
+      end
+    end
+  end
+
+  class SchemaParseError < AvroError; end
+
+  module Name
+    def self.extract_namespace(name, namespace)
+      parts = name.split('.')
+      if parts.size > 1
+        namespace, name = parts[0..-2].join('.'), parts.last
+      end
+      return name, namespace
+    end
+
+    # Add a new schema object to the names dictionary (in place).
+    def self.add_name(names, new_schema)
+      new_fullname = new_schema.fullname
+      if Avro::Schema::VALID_TYPES.include?(new_fullname)
+        raise SchemaParseError, "#{new_fullname} is a reserved type name."
+      elsif names.nil?
+        names = {}
+      elsif names.has_key?(new_fullname)
+        raise SchemaParseError, "The name \"#{new_fullname}\" is already in use."
+      end
+
+      names[new_fullname] = new_schema
+      names
+    end
+
+    def self.make_fullname(name, namespace)
+      if !name.include?('.') && !namespace.nil?
+        namespace + '.' + name
+      else
+        name
+      end
+    end
+  end
+end

Added: hadoop/avro/trunk/lang/ruby/test/sample_ipc_client.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/test/sample_ipc_client.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/test/sample_ipc_client.rb (added)
+++ hadoop/avro/trunk/lang/ruby/test/sample_ipc_client.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,86 @@
+#!/usr/bin/env ruby
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'socket'
+require 'avro'
+
+MAIL_PROTOCOL_JSON = <<-JSON
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+     {"name": "Message", "type": "record",
+      "fields": [
+          {"name": "to",   "type": "string"},
+          {"name": "from", "type": "string"},
+          {"name": "body", "type": "string"}
+      ]
+     }
+ ],
+
+ "messages": {
+     "send": {
+         "request": [{"name": "message", "type": "Message"}],
+         "response": "string"
+     },
+     "replay": {
+         "request": [],
+         "response": "string"
+     }
+ }
+}
+JSON
+
+MAIL_PROTOCOL = Avro::Protocol.parse(MAIL_PROTOCOL_JSON)
+
+def make_requestor(server_address, port, protocol)
+  sock = TCPSocket.new(server_address, port)
+  client = Avro::IPC::SocketTransport.new(sock)
+  Avro::IPC::Requestor.new(protocol, client)
+end
+
+if $0 == __FILE__
+  if ![3, 4].include?(ARGV.length)
+    raise "Usage: <to> <from> <body> [<count>]"
+  end
+
+  # client code - attach to the server and send a message
+  # fill in the Message record
+  message = {
+    'to'   => ARGV[0],
+    'from' => ARGV[1],
+    'body' => ARGV[2]
+  }
+
+  num_messages = ARGV[3].to_i
+  num_message = 1 if num_messages == 0
+
+  # build the parameters for the request
+  params = {'message' => message}
+
+  # send the requests and print the result
+  num_messages.times do
+    requestor = make_requestor('localhost', 9090, MAIL_PROTOCOL)
+    result = requestor.request('send', params)
+    puts("Result: " + result)
+  end
+
+  # try out a replay message
+  requestor = make_requestor('localhost', 9090, MAIL_PROTOCOL)
+  result = requestor.request('replay', {})
+  puts("Replay Result: " + result)
+end
\ No newline at end of file

Added: hadoop/avro/trunk/lang/ruby/test/sample_ipc_server.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/test/sample_ipc_server.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/test/sample_ipc_server.rb (added)
+++ hadoop/avro/trunk/lang/ruby/test/sample_ipc_server.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,91 @@
+#!/usr/bin/env ruby
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'socket'
+require 'avro'
+
+MAIL_PROTOCOL_JSON = <<-EOS
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+     {"name": "Message", "type": "record",
+      "fields": [
+          {"name": "to",   "type": "string"},
+          {"name": "from", "type": "string"},
+          {"name": "body", "type": "string"}
+      ]
+     }
+ ],
+
+ "messages": {
+     "send": {
+         "request": [{"name": "message", "type": "Message"}],
+         "response": "string"
+     },
+     "replay": {
+         "request": [],
+         "response": "string"
+     }
+ }
+}
+EOS
+
+MAIL_PROTOCOL = Avro::Protocol.parse(MAIL_PROTOCOL_JSON)
+
+class MailResponder < Avro::IPC::Responder
+  def initialize
+    super(MAIL_PROTOCOL)
+  end
+
+  def call(message, request)
+    if message.name == 'send'
+      request_content = request['message']
+      "Sent message to #{request_content['to']} from #{request_content['from']} with body #{request_content['body']}"
+    elsif message.name == 'replay'
+      'replay'
+    end
+  end
+end
+
+class RequestHandler
+  def initialize(address, port)
+    @ip_address = address
+    @port = port
+  end
+
+  def run
+    server = TCPServer.new(@ip_address, @port)
+    while (session = server.accept)
+      handle(session)
+      session.close
+    end
+  end
+end
+
+class MailHandler < RequestHandler
+  def handle(request)
+    responder = MailResponder.new()
+    transport = Avro::IPC::SocketTransport.new(request)
+    transport.write_framed_message(responder.respond(transport))
+  end
+end
+
+if $0 == __FILE__
+  handler = MailHandler.new('localhost', 9090)
+  handler.run
+end
\ No newline at end of file

Added: hadoop/avro/trunk/lang/ruby/test/test_help.rb
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/ruby/test/test_help.rb?rev=901024&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/ruby/test/test_help.rb (added)
+++ hadoop/avro/trunk/lang/ruby/test/test_help.rb Wed Jan 20 01:05:06 2010
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'rubygems'
+require 'test/unit'
+require 'avro'
+require 'stringio'
+
+require 'fileutils'
+FileUtils.mkdir_p('tmp')
+
+class RandomData
+  def initialize(schm, seed=nil)
+    srand(seed) if seed
+    @seed = seed
+    @schm = schm
+  end
+
+  def next
+    nextdata(@schm)
+  end
+
+  def nextdata(schm, d=0)
+    case schm.type
+    when 'boolean'
+      rand > 0.5
+    when 'string'
+      randstr()
+    when 'int'
+      rand(Avro::Schema::INT_MAX_VALUE - Avro::Schema::INT_MIN_VALUE) + Avro::Schema::INT_MIN_VALUE
+    when 'long'
+      rand(Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) + Avro::Schema::LONG_MIN_VALUE
+    when 'float'
+      (-1024 + 2048 * rand).round.to_f
+    when 'double'
+      Avro::Schema::LONG_MIN_VALUE + (Avro::Schema::LONG_MAX_VALUE - Avro::Schema::LONG_MIN_VALUE) * rand
+    when 'bytes'
+      randstr(BYTEPOOL)
+    when 'null'
+      nil
+    when 'array'
+      arr = []
+      len = rand(5) + 2 - d
+      len = 0 if len < 0
+      len.times{ arr << nextdata(schm.items, d+1) }
+      arr
+    when 'map'
+      map = {}
+      len = rand(5) + 2 - d
+      len = 0 if len < 0
+      len.times do
+        map[nextdata(Avro::Schema::PrimitiveSchema.new('string'))] = nextdata(schm.values, d+1)
+      end
+      map
+    when 'record'
+      m = {}
+      schm.fields.each do |field|
+        m[field.name] = nextdata(field.type, d+1)
+      end
+      m
+    when 'union'
+      types = schm.schemas
+      nextdata(types[rand(types.size)], d)
+    when 'enum'
+      symbols = schm.symbols
+      len = symbols.size
+      return nil if len == 0
+      symbols[rand(len)]
+    when 'fixed'
+      BYTEPOOL[rand(BYTEPOOL.size), 1]
+    end
+  end
+
+  CHARPOOL = 'abcdefghjkmnpqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ23456789'
+  BYTEPOOL = '12345abcd'
+
+  def randstr(chars=CHARPOOL, length=20)
+    str = ''
+    rand(length+1).times { str << chars[rand(chars.size)] }
+    str
+  end
+end



Mime
View raw message