avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1557225 [1/3] - in /avro/trunk: ./ lang/py3/ lang/py3/avro/ lang/py3/avro/tests/ lang/py3/scripts/ share/test/schemas/
Date Fri, 10 Jan 2014 19:11:43 GMT
Author: cutting
Date: Fri Jan 10 19:11:42 2014
New Revision: 1557225

URL: http://svn.apache.org/r1557225
Log:
AVRO-1382. Add support for Python3.  Contributed by Christophe Taton.

Added:
    avro/trunk/lang/py3/   (with props)
    avro/trunk/lang/py3/avro/
    avro/trunk/lang/py3/avro/__init__.py   (with props)
    avro/trunk/lang/py3/avro/datafile.py   (with props)
    avro/trunk/lang/py3/avro/io.py   (with props)
    avro/trunk/lang/py3/avro/ipc.py   (with props)
    avro/trunk/lang/py3/avro/protocol.py   (with props)
    avro/trunk/lang/py3/avro/schema.py   (with props)
    avro/trunk/lang/py3/avro/tests/
    avro/trunk/lang/py3/avro/tests/av_bench.py   (with props)
    avro/trunk/lang/py3/avro/tests/gen_interop_data.py   (with props)
    avro/trunk/lang/py3/avro/tests/run_tests.py   (with props)
    avro/trunk/lang/py3/avro/tests/sample_http_client.py   (with props)
    avro/trunk/lang/py3/avro/tests/sample_http_server.py   (with props)
    avro/trunk/lang/py3/avro/tests/test_datafile.py   (with props)
    avro/trunk/lang/py3/avro/tests/test_datafile_interop.py   (with props)
    avro/trunk/lang/py3/avro/tests/test_io.py   (with props)
    avro/trunk/lang/py3/avro/tests/test_ipc.py   (with props)
    avro/trunk/lang/py3/avro/tests/test_protocol.py   (with props)
    avro/trunk/lang/py3/avro/tests/test_schema.py   (with props)
    avro/trunk/lang/py3/avro/tests/test_script.py   (with props)
    avro/trunk/lang/py3/avro/tests/txsample_http_client.py   (with props)
    avro/trunk/lang/py3/avro/tests/txsample_http_server.py   (with props)
    avro/trunk/lang/py3/avro/tool.py   (with props)
    avro/trunk/lang/py3/avro/txipc.py   (with props)
    avro/trunk/lang/py3/scripts/
    avro/trunk/lang/py3/scripts/avro
    avro/trunk/lang/py3/setup.py   (with props)
    avro/trunk/share/test/schemas/echo.avdl
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/build.sh

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1557225&r1=1557224&r2=1557225&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Jan 10 19:11:42 2014
@@ -35,6 +35,8 @@ Trunk (not yet released)
 
     AVRO-1421. Java: Add an @AvroSchema annotation to reflect. (cutting)
 
+    AVRO-1382. Add support for Python3. (Christophe Taton via cutting)
+
   OPTIMIZATIONS
 
     AVRO-1348. Java: Improve UTF-8 to String conversion performance in

Modified: avro/trunk/build.sh
URL: http://svn.apache.org/viewvc/avro/trunk/build.sh?rev=1557225&r1=1557224&r2=1557225&view=diff
==============================================================================
--- avro/trunk/build.sh (original)
+++ avro/trunk/build.sh Fri Jan 10 19:11:42 2014
@@ -42,6 +42,7 @@ case "$target" in
 	# run lang-specific tests
         (cd lang/java; mvn test)
 	(cd lang/py; ant test)
+	(cd lang/py3; python3 setup.py test)
 	(cd lang/c; ./build.sh test)
 	(cd lang/c++; ./build.sh test)
 	(cd lang/csharp; ./build.sh test)
@@ -100,6 +101,7 @@ case "$target" in
         (mvn -N -P copy-artifacts antrun:run) 
 
 	(cd lang/py; ant dist)
+	(cd lang/py3; python3 setup.py bdist; cp -r dist ../../dist/py3)
 
 	(cd lang/c; ./build.sh dist)
 
@@ -148,6 +150,7 @@ case "$target" in
         (mvn clean)         
 
 	(cd lang/py; ant clean)
+	(cd lang/py3; python3 setup.py clean)
 
 	(cd lang/c; ./build.sh clean)
 

Propchange: avro/trunk/lang/py3/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Fri Jan 10 19:11:42 2014
@@ -0,0 +1,9 @@
+dist
+build
+avro.egg-info
+avro/__pycache__
+avro/HandshakeRequest.avsc
+avro/HandshakeResponse.avsc
+avro/VERSION.txt
+avro/tests/__pycache__
+avro/tests/interop.avsc

Added: avro/trunk/lang/py3/avro/__init__.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/__init__.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/__init__.py (added)
+++ avro/trunk/lang/py3/avro/__init__.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,34 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+__all__ = ('schema', 'io', 'datafile', 'protocol', 'ipc')
+
+
+import os
+
+
+def LoadResource(name):
+  dir_path = os.path.dirname(__file__)
+  rsrc_path = os.path.join(dir_path, name)
+  with open(rsrc_path, 'r') as f:
+    return f.read()
+
+
+VERSION = LoadResource('VERSION.txt').strip()

Propchange: avro/trunk/lang/py3/avro/__init__.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/datafile.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/datafile.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/datafile.py (added)
+++ avro/trunk/lang/py3/avro/datafile.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,532 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Read/Write Avro File Object Containers."""
+
+import io
+import logging
+import os
+import zlib
+
+from avro import schema
+from avro import io as avro_io
+
+try:
+  import snappy
+  has_snappy = True
+except ImportError:
+  has_snappy = False
+
+
+# ------------------------------------------------------------------------------
+# Constants
+
+# Version of the container file:
+VERSION = 1
+
+# Magic code that starts a data container file:
+MAGIC = b'Obj' + bytes([VERSION])
+
+# Size of the magic code, in number of bytes:
+MAGIC_SIZE = len(MAGIC)
+
+# Size of the synchronization marker, in number of bytes:
+SYNC_SIZE = 16
+
+# Interval between synchronization markers, in number of bytes:
+# TODO: make configurable
+SYNC_INTERVAL = 1000 * SYNC_SIZE
+
+# Schema of the container header:
+META_SCHEMA = schema.Parse("""
+{
+  "type": "record", "name": "org.apache.avro.file.Header",
+  "fields": [{
+    "name": "magic",
+    "type": {"type": "fixed", "name": "magic", "size": %(magic_size)d}
+  }, {
+    "name": "meta",
+    "type": {"type": "map", "values": "bytes"}
+  }, {
+    "name": "sync",
+    "type": {"type": "fixed", "name": "sync", "size": %(sync_size)d}
+  }]
+}
+""" % {
+    'magic_size': MAGIC_SIZE,
+    'sync_size': SYNC_SIZE,
+})
+
+# Codecs supported by container files:
+VALID_CODECS = frozenset(['null', 'deflate'])
+if has_snappy:
+  VALID_CODECS = frozenset.union(VALID_CODECS, ['snappy'])
+
+# Not used yet
+VALID_ENCODINGS = frozenset(['binary'])
+
+# Metadata key associated to the codec:
+CODEC_KEY = "avro.codec"
+
+# Metadata key associated to the schema:
+SCHEMA_KEY = "avro.schema"
+
+
+# ------------------------------------------------------------------------------
+# Exceptions
+
+
+class DataFileException(schema.AvroException):
+  """Problem reading or writing file object containers."""
+
+  def __init__(self, msg):
+    super(DataFileException, self).__init__(msg)
+
+
+# ------------------------------------------------------------------------------
+
+
+class DataFileWriter(object):
+  """Writes Avro data files."""
+
+  @staticmethod
+  def GenerateSyncMarker():
+    """Generates a random synchronization marker."""
+    return os.urandom(SYNC_SIZE)
+
+  # TODO: make 'encoder' a metadata property
+  def __init__(
+      self,
+      writer,
+      datum_writer,
+      writer_schema=None,
+      codec='null',
+  ):
+    """Constructs a new DataFileWriter instance.
+
+    If the schema is not present, presume we're appending.
+
+    Args:
+      writer: File-like object to write into.
+      datum_writer:
+      writer_schema: Schema
+      codec:
+    """
+    self._writer = writer
+    self._encoder = avro_io.BinaryEncoder(writer)
+    self._datum_writer = datum_writer
+    self._buffer_writer = io.BytesIO()
+    self._buffer_encoder = avro_io.BinaryEncoder(self._buffer_writer)
+    self._block_count = 0
+    self._meta = {}
+
+    # Ensure we have a writer that accepts bytes:
+    self._writer.write(b'')
+
+    # Whether the header has already been written:
+    self._header_written = False
+
+    if writer_schema is not None:
+      if codec not in VALID_CODECS:
+        raise DataFileException('Unknown codec: %r' % codec)
+      self._sync_marker = DataFileWriter.GenerateSyncMarker()
+      self.SetMeta('avro.codec', codec)
+      self.SetMeta('avro.schema', str(writer_schema).encode('utf-8'))
+      self.datum_writer.writer_schema = writer_schema
+    else:
+      # open writer for reading to collect metadata
+      dfr = DataFileReader(writer, avro_io.DatumReader())
+
+      # TODO: collect arbitrary metadata
+      # collect metadata
+      self._sync_marker = dfr.sync_marker
+      self.SetMeta('avro.codec', dfr.GetMeta('avro.codec'))
+
+      # get schema used to write existing file
+      schema_from_file = dfr.GetMeta('avro.schema').decode('utf-8')
+      self.SetMeta('avro.schema', schema_from_file)
+      self.datum_writer.writer_schema = schema.Parse(schema_from_file)
+
+      # seek to the end of the file and prepare for writing
+      writer.seek(0, 2)
+      self._header_written = True
+
+  # read-only properties
+
+  @property
+  def writer(self):
+    return self._writer
+
+  @property
+  def encoder(self):
+    return self._encoder
+
+  @property
+  def datum_writer(self):
+    return self._datum_writer
+
+  @property
+  def buffer_encoder(self):
+    return self._buffer_encoder
+
+  @property
+  def sync_marker(self):
+    return self._sync_marker
+
+  @property
+  def meta(self):
+    return self._meta
+
+  def __enter__(self):
+    return self
+
+  def __exit__(self, type, value, traceback):
+    # Perform a close if there's no exception
+    if type is None:
+      self.close()
+
+  @property
+  def block_count(self):
+    return self._block_count
+
+  def GetMeta(self, key):
+    """Reports the metadata associated to the given key.
+
+    Args:
+      key: Key of the metadata to report the value of.
+    Returns:
+      The metadata value, as bytes, or None if the key does not exist.
+    """
+    return self._meta.get(key)
+
+  def SetMeta(self, key, value):
+    """Sets the metadata value for the given key.
+
+    Note: metadata is persisted and retrieved as bytes.
+
+    Args:
+      key: Key of the metadata to set.
+      value: Value of the metadata, as bytes or str.
+          Strings are automatically converted to bytes.
+    """
+    if isinstance(value, str):
+      value = value.encode('utf-8')
+    assert isinstance(value, bytes), (
+        'Invalid metadata value for key %r: %r' % (key, value))
+    self._meta[key] = value
+
+  def _WriteHeader(self):
+    header = {
+        'magic': MAGIC,
+        'meta': self.meta,
+        'sync': self.sync_marker,
+    }
+    logging.debug(
+        'Writing Avro data file header:\n%s\nAvro header schema:\n%s',
+        header, META_SCHEMA)
+    self.datum_writer.write_data(META_SCHEMA, header, self.encoder)
+    self._header_written = True
+
+  # TODO: make a schema for blocks and use datum_writer
+  def _WriteBlock(self):
+    if not self._header_written:
+      self._WriteHeader()
+
+    if self.block_count <= 0:
+      logging.info('Current block is empty, nothing to write.')
+      return
+
+    # write number of items in block
+    self.encoder.write_long(self.block_count)
+
+    # write block contents
+    uncompressed_data = self._buffer_writer.getvalue()
+    codec = self.GetMeta(CODEC_KEY).decode('utf-8')
+    if codec == 'null':
+      compressed_data = uncompressed_data
+      compressed_data_length = len(compressed_data)
+    elif codec == 'deflate':
+      # The first two characters and last character are zlib
+      # wrappers around deflate data.
+      compressed_data = zlib.compress(uncompressed_data)[2:-1]
+      compressed_data_length = len(compressed_data)
+    elif codec == 'snappy':
+      compressed_data = snappy.compress(uncompressed_data)
+      compressed_data_length = len(compressed_data) + 4 # crc32
+    else:
+      fail_msg = '"%s" codec is not supported.' % codec
+      raise DataFileException(fail_msg)
+
+    # Write length of block
+    self.encoder.write_long(compressed_data_length)
+
+    # Write block
+    self.writer.write(compressed_data)
+
+    # Write CRC32 checksum for Snappy
+    if self.GetMeta(CODEC_KEY) == 'snappy':
+      self.encoder.write_crc32(uncompressed_data)
+
+    # write sync marker
+    self.writer.write(self.sync_marker)
+
+    logging.debug(
+        'Writing block with count=%d nbytes=%d sync=%r',
+        self.block_count, compressed_data_length, self.sync_marker)
+
+    # reset buffer
+    self._buffer_writer.seek(0)
+    self._buffer_writer.truncate()
+    self._block_count = 0
+
+  def append(self, datum):
+    """Append a datum to the file."""
+    self.datum_writer.write(datum, self.buffer_encoder)
+    self._block_count += 1
+
+    # if the data to write is larger than the sync interval, write the block
+    if self._buffer_writer.tell() >= SYNC_INTERVAL:
+      self._WriteBlock()
+
+  def sync(self):
+    """
+    Return the current position as a value that may be passed to
+    DataFileReader.seek(long). Forces the end of the current block,
+    emitting a synchronization marker.
+    """
+    self._WriteBlock()
+    return self.writer.tell()
+
+  def flush(self):
+    """Flush the current state of the file, including metadata."""
+    self._WriteBlock()
+    self.writer.flush()
+
+  def close(self):
+    """Close the file."""
+    self.flush()
+    self.writer.close()
+
+
+# ------------------------------------------------------------------------------
+
+
+class DataFileReader(object):
+  """Read files written by DataFileWriter."""
+
+  # TODO: allow user to specify expected schema?
+  # TODO: allow user to specify the encoder
+  def __init__(self, reader, datum_reader):
+    """Initializes a new data file reader.
+
+    Args:
+      reader: Open file to read from.
+      datum_reader: Avro datum reader.
+    """
+    self._reader = reader
+    self._raw_decoder = avro_io.BinaryDecoder(reader)
+    self._datum_decoder = None # Maybe reset at every block.
+    self._datum_reader = datum_reader
+
+    # read the header: magic, meta, sync
+    self._read_header()
+
+    # ensure codec is valid
+    self.codec = self.GetMeta('avro.codec').decode('utf-8')
+    if self.codec is None:
+      self.codec = "null"
+    if self.codec not in VALID_CODECS:
+      raise DataFileException('Unknown codec: %s.' % self.codec)
+
+    self._file_length = self._GetInputFileLength()
+
+    # get ready to read
+    self._block_count = 0
+    self.datum_reader.writer_schema = (
+        schema.Parse(self.GetMeta(SCHEMA_KEY).decode('utf-8')))
+
+  def __enter__(self):
+    return self
+
+  def __exit__(self, type, value, traceback):
+    # Perform a close if there's no exception
+    if type is None:
+      self.close()
+
+  def __iter__(self):
+    return self
+
+  def __next__(self):
+    """Implements the iterator interface."""
+    return next(self)
+
+  # read-only properties
+  @property
+  def reader(self):
+    return self._reader
+
+  @property
+  def raw_decoder(self):
+    return self._raw_decoder
+
+  @property
+  def datum_decoder(self):
+    return self._datum_decoder
+
+  @property
+  def datum_reader(self):
+    return self._datum_reader
+
+  @property
+  def sync_marker(self):
+    return self._sync_marker
+
+  @property
+  def meta(self):
+    return self._meta
+
+  @property
+  def file_length(self):
+    """Length of the input file, in bytes."""
+    return self._file_length
+
+  # read/write properties
+  @property
+  def block_count(self):
+    return self._block_count
+
+  def GetMeta(self, key):
+    """Reports the value of a given metadata key.
+
+    Args:
+      key: Metadata key (string) to report the value of.
+    Returns:
+      Value associated to the metadata key, as bytes.
+    """
+    return self._meta.get(key)
+
+  def SetMeta(self, key, value):
+    """Sets a metadata.
+
+    Args:
+      key: Metadata key (string) to set.
+      value: Metadata value to set, as bytes.
+    """
+    if isinstance(value, str):
+      value = value.encode('utf-8')
+    self._meta[key] = value
+
+  def _GetInputFileLength(self):
+    """Reports the length of the input file, in bytes.
+
+    Leaves the current position unmodified.
+
+    Returns:
+      The length of the input file, in bytes.
+    """
+    current_pos = self.reader.tell()
+    self.reader.seek(0, 2)
+    file_length = self.reader.tell()
+    self.reader.seek(current_pos)
+    return file_length
+
+  def is_EOF(self):
+    return self.reader.tell() == self.file_length
+
+  def _read_header(self):
+    # seek to the beginning of the file to get magic block
+    self.reader.seek(0, 0)
+
+    # read header into a dict
+    header = self.datum_reader.read_data(
+      META_SCHEMA, META_SCHEMA, self.raw_decoder)
+
+    # check magic number
+    if header.get('magic') != MAGIC:
+      fail_msg = "Not an Avro data file: %s doesn't match %s."\
+                 % (header.get('magic'), MAGIC)
+      raise schema.AvroException(fail_msg)
+
+    # set metadata
+    self._meta = header['meta']
+
+    # set sync marker
+    self._sync_marker = header['sync']
+
+  def _read_block_header(self):
+    self._block_count = self.raw_decoder.read_long()
+    if self.codec == "null":
+      # Skip a long; we don't need to use the length.
+      self.raw_decoder.skip_long()
+      self._datum_decoder = self._raw_decoder
+    elif self.codec == 'deflate':
+      # Compressed data is stored as (length, data), which
+      # corresponds to how the "bytes" type is encoded.
+      data = self.raw_decoder.read_bytes()
+      # -15 is the log of the window size; negative indicates
+      # "raw" (no zlib headers) decompression.  See zlib.h.
+      uncompressed = zlib.decompress(data, -15)
+      self._datum_decoder = avro_io.BinaryDecoder(io.BytesIO(uncompressed))
+    elif self.codec == 'snappy':
+      # Compressed data includes a 4-byte CRC32 checksum
+      length = self.raw_decoder.read_long()
+      data = self.raw_decoder.read(length - 4)
+      uncompressed = snappy.decompress(data)
+      self._datum_decoder = avro_io.BinaryDecoder(io.BytesIO(uncompressed))
+      self.raw_decoder.check_crc32(uncompressed);
+    else:
+      raise DataFileException("Unknown codec: %r" % self.codec)
+
+  def _skip_sync(self):
+    """
+    Read the length of the sync marker; if it matches the sync marker,
+    return True. Otherwise, seek back to where we started and return False.
+    """
+    proposed_sync_marker = self.reader.read(SYNC_SIZE)
+    if proposed_sync_marker != self.sync_marker:
+      self.reader.seek(-SYNC_SIZE, 1)
+      return False
+    else:
+      return True
+
+  # TODO: handle block of length zero
+  # TODO: clean this up with recursion
+  def __next__(self):
+    """Return the next datum in the file."""
+    if self.block_count == 0:
+      if self.is_EOF():
+        raise StopIteration
+      elif self._skip_sync():
+        if self.is_EOF(): raise StopIteration
+        self._read_block_header()
+      else:
+        self._read_block_header()
+
+    datum = self.datum_reader.read(self.datum_decoder)
+    self._block_count -= 1
+    return datum
+
+  def close(self):
+    """Close this reader."""
+    self.reader.close()
+
+
+if __name__ == '__main__':
+  raise Exception('Not a standalone module')

Propchange: avro/trunk/lang/py3/avro/datafile.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/io.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/io.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/io.py (added)
+++ avro/trunk/lang/py3/avro/io.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,933 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Input/output utilities.
+
+Includes:
+ - i/o-specific constants
+ - i/o-specific exceptions
+ - schema validation
+ - leaf value encoding and decoding
+ - datum reader/writer stuff (?)
+
+Also includes a generic representation for data, which uses the
+following mapping:
+ - Schema records are implemented as dict.
+ - Schema arrays are implemented as list.
+ - Schema maps are implemented as dict.
+ - Schema strings are implemented as unicode.
+ - Schema bytes are implemented as str.
+ - Schema ints are implemented as int.
+ - Schema longs are implemented as long.
+ - Schema floats are implemented as float.
+ - Schema doubles are implemented as float.
+ - Schema booleans are implemented as bool.
+"""
+
+import binascii
+import json
+import logging
+import struct
+import sys
+
+from avro import schema
+
+
+# ------------------------------------------------------------------------------
+# Constants
+
+
+INT_MIN_VALUE = -(1 << 31)
+INT_MAX_VALUE = (1 << 31) - 1
+LONG_MIN_VALUE = -(1 << 63)
+LONG_MAX_VALUE = (1 << 63) - 1
+
+STRUCT_INT = struct.Struct('!I')     # big-endian unsigned int
+STRUCT_LONG = struct.Struct('!Q')    # big-endian unsigned long long
+STRUCT_FLOAT = struct.Struct('!f')   # big-endian float
+STRUCT_DOUBLE = struct.Struct('!d')  # big-endian double
+STRUCT_CRC32 = struct.Struct('>I')   # big-endian unsigned int
+
+
+# ------------------------------------------------------------------------------
+# Exceptions
+
+
+class AvroTypeException(schema.AvroException):
+  """Raised when datum is not an example of schema."""
+  def __init__(self, expected_schema, datum):
+    pretty_expected = json.dumps(json.loads(str(expected_schema)), indent=2)
+    fail_msg = "The datum %s is not an example of the schema %s"\
+               % (datum, pretty_expected)
+    schema.AvroException.__init__(self, fail_msg)
+
+
+class SchemaResolutionException(schema.AvroException):
+  def __init__(self, fail_msg, writer_schema=None, reader_schema=None):
+    pretty_writers = json.dumps(json.loads(str(writer_schema)), indent=2)
+    pretty_readers = json.dumps(json.loads(str(reader_schema)), indent=2)
+    if writer_schema: fail_msg += "\nWriter's Schema: %s" % pretty_writers
+    if reader_schema: fail_msg += "\nReader's Schema: %s" % pretty_readers
+    schema.AvroException.__init__(self, fail_msg)
+
+
+# ------------------------------------------------------------------------------
+# Validate
+
+
+def Validate(expected_schema, datum):
+  """Determines if a python datum is an instance of a schema.
+
+  Args:
+    expected_schema: Schema to validate against.
+    datum: Datum to validate.
+  Returns:
+    True if the datum is an instance of the schema.
+  """
+  schema_type = expected_schema.type
+  if schema_type == 'null':
+    return datum is None
+  elif schema_type == 'boolean':
+    return isinstance(datum, bool)
+  elif schema_type == 'string':
+    return isinstance(datum, str)
+  elif schema_type == 'bytes':
+    return isinstance(datum, bytes)
+  elif schema_type == 'int':
+    return (isinstance(datum, int)
+        and (INT_MIN_VALUE <= datum <= INT_MAX_VALUE))
+  elif schema_type == 'long':
+    return (isinstance(datum, int)
+        and (LONG_MIN_VALUE <= datum <= LONG_MAX_VALUE))
+  elif schema_type in ['float', 'double']:
+    return (isinstance(datum, int) or isinstance(datum, float))
+  elif schema_type == 'fixed':
+    return isinstance(datum, bytes) and (len(datum) == expected_schema.size)
+  elif schema_type == 'enum':
+    return datum in expected_schema.symbols
+  elif schema_type == 'array':
+    return (isinstance(datum, list)
+        and all(Validate(expected_schema.items, item) for item in datum))
+  elif schema_type == 'map':
+    return (isinstance(datum, dict)
+        and all(isinstance(key, str) for key in datum.keys())
+        and all(Validate(expected_schema.values, value)
+                for value in datum.values()))
+  elif schema_type in ['union', 'error_union']:
+    return any(Validate(union_branch, datum)
+               for union_branch in expected_schema.schemas)
+  elif schema_type in ['record', 'error', 'request']:
+    return (isinstance(datum, dict)
+        and all(Validate(field.type, datum.get(field.name))
+                for field in expected_schema.fields))
+  else:
+    raise AvroTypeException('Unknown Avro schema type: %r' % schema_type)
+
+
+# ------------------------------------------------------------------------------
+# Decoder/Encoder
+
+
+class BinaryDecoder(object):
+  """Read leaf values."""
+  def __init__(self, reader):
+    """
+    reader is a Python object on which we can call read, seek, and tell.
+    """
+    self._reader = reader
+
+  @property
+  def reader(self):
+    """Reports the reader used by this decoder."""
+    return self._reader
+
+  def read(self, n):
+    """Read n bytes.
+
+    Args:
+      n: Number of bytes to read.
+    Returns:
+      The next n bytes from the input.
+    """
+    assert (n >= 0), n
+    input_bytes = self.reader.read(n)
+    assert (len(input_bytes) == n), input_bytes
+    return input_bytes
+
+  def read_null(self):
+    """
+    null is written as zero bytes
+    """
+    return None
+
+  def read_boolean(self):
+    """
+    a boolean is written as a single byte
+    whose value is either 0 (false) or 1 (true).
+    """
+    return ord(self.read(1)) == 1
+
+  def read_int(self):
+    """
+    int and long values are written using variable-length, zig-zag coding.
+    """
+    return self.read_long()
+
+  def read_long(self):
+    """
+    int and long values are written using variable-length, zig-zag coding.
+    """
+    b = ord(self.read(1))
+    n = b & 0x7F
+    shift = 7
+    while (b & 0x80) != 0:
+      b = ord(self.read(1))
+      n |= (b & 0x7F) << shift
+      shift += 7
+    datum = (n >> 1) ^ -(n & 1)
+    return datum
+
+  def read_float(self):
+    """
+    A float is written as 4 bytes.
+    The float is converted into a 32-bit integer using a method equivalent to
+    Java's floatToIntBits and then encoded in little-endian format.
+    """
+    bits = (((ord(self.read(1)) & 0xff)) |
+      ((ord(self.read(1)) & 0xff) <<  8) |
+      ((ord(self.read(1)) & 0xff) << 16) |
+      ((ord(self.read(1)) & 0xff) << 24))
+    return STRUCT_FLOAT.unpack(STRUCT_INT.pack(bits))[0]
+
+  def read_double(self):
+    """
+    A double is written as 8 bytes.
+    The double is converted into a 64-bit integer using a method equivalent to
+    Java's doubleToLongBits and then encoded in little-endian format.
+    """
+    bits = (((ord(self.read(1)) & 0xff)) |
+      ((ord(self.read(1)) & 0xff) <<  8) |
+      ((ord(self.read(1)) & 0xff) << 16) |
+      ((ord(self.read(1)) & 0xff) << 24) |
+      ((ord(self.read(1)) & 0xff) << 32) |
+      ((ord(self.read(1)) & 0xff) << 40) |
+      ((ord(self.read(1)) & 0xff) << 48) |
+      ((ord(self.read(1)) & 0xff) << 56))
+    return STRUCT_DOUBLE.unpack(STRUCT_LONG.pack(bits))[0]
+
+  def read_bytes(self):
+    """
+    Bytes are encoded as a long followed by that many bytes of data.
+    """
+    nbytes = self.read_long()
+    assert (nbytes >= 0), nbytes
+    return self.read(nbytes)
+
+  def read_utf8(self):
+    """
+    A string is encoded as a long followed by
+    that many bytes of UTF-8 encoded character data.
+    """
+    input_bytes = self.read_bytes()
+    try:
+      return input_bytes.decode('utf-8')
+    except UnicodeDecodeError as exn:
+      logging.error('Invalid UTF-8 input bytes: %r', input_bytes)
+      raise exn
+
+  def check_crc32(self, bytes):
+    checksum = STRUCT_CRC32.unpack(self.read(4))[0];
+    if binascii.crc32(bytes) & 0xffffffff != checksum:
+      raise schema.AvroException("Checksum failure")
+
+  def skip_null(self):
+    pass
+
+  def skip_boolean(self):
+    self.skip(1)
+
+  def skip_int(self):
+    self.skip_long()
+
+  def skip_long(self):
+    b = ord(self.read(1))
+    while (b & 0x80) != 0:
+      b = ord(self.read(1))
+
+  def skip_float(self):
+    self.skip(4)
+
+  def skip_double(self):
+    self.skip(8)
+
+  def skip_bytes(self):
+    self.skip(self.read_long())
+
+  def skip_utf8(self):
+    self.skip_bytes()
+
+  def skip(self, n):
+    self.reader.seek(self.reader.tell() + n)
+
+
+# ------------------------------------------------------------------------------
+
+
+class BinaryEncoder(object):
+  """Write leaf values."""
+
+  def __init__(self, writer):
+    """
+    writer is a Python object on which we can call write.
+    """
+    self._writer = writer
+
+  @property
+  def writer(self):
+    """Reports the writer used by this encoder."""
+    return self._writer
+
+
+  def write(self, datum):
+    """Write a sequence of bytes.
+
+    Args:
+      datum: Byte array, as a Python bytes.
+    """
+    assert isinstance(datum, bytes), ('Expecting bytes, got %r' % datum)
+    self.writer.write(datum)
+
+  def WriteByte(self, byte):
+    self.writer.write(bytes((byte,)))
+
+  def write_null(self, datum):
+    """
+    null is written as zero bytes
+    """
+    pass
+
+  def write_boolean(self, datum):
+    """
+    a boolean is written as a single byte
+    whose value is either 0 (false) or 1 (true).
+    """
+    # Python maps True to 1 and False to 0.
+    self.WriteByte(int(bool(datum)))
+
+  def write_int(self, datum):
+    """
+    int and long values are written using variable-length, zig-zag coding.
+    """
+    self.write_long(datum);
+
+  def write_long(self, datum):
+    """
+    int and long values are written using variable-length, zig-zag coding.
+    """
+    datum = (datum << 1) ^ (datum >> 63)
+    while (datum & ~0x7F) != 0:
+      self.WriteByte((datum & 0x7f) | 0x80)
+      datum >>= 7
+    self.WriteByte(datum)
+
+  def write_float(self, datum):
+    """
+    A float is written as 4 bytes.
+    The float is converted into a 32-bit integer using a method equivalent to
+    Java's floatToIntBits and then encoded in little-endian format.
+    """
+    bits = STRUCT_INT.unpack(STRUCT_FLOAT.pack(datum))[0]
+    self.WriteByte((bits) & 0xFF)
+    self.WriteByte((bits >> 8) & 0xFF)
+    self.WriteByte((bits >> 16) & 0xFF)
+    self.WriteByte((bits >> 24) & 0xFF)
+
+  def write_double(self, datum):
+    """
+    A double is written as 8 bytes.
+    The double is converted into a 64-bit integer using a method equivalent to
+    Java's doubleToLongBits and then encoded in little-endian format.
+    """
+    bits = STRUCT_LONG.unpack(STRUCT_DOUBLE.pack(datum))[0]
+    self.WriteByte((bits) & 0xFF)
+    self.WriteByte((bits >> 8) & 0xFF)
+    self.WriteByte((bits >> 16) & 0xFF)
+    self.WriteByte((bits >> 24) & 0xFF)
+    self.WriteByte((bits >> 32) & 0xFF)
+    self.WriteByte((bits >> 40) & 0xFF)
+    self.WriteByte((bits >> 48) & 0xFF)
+    self.WriteByte((bits >> 56) & 0xFF)
+
+  def write_bytes(self, datum):
+    """
+    Bytes are encoded as a long followed by that many bytes of data.
+    """
+    self.write_long(len(datum))
+    self.write(datum)
+
+  def write_utf8(self, datum):
+    """
+    A string is encoded as a long followed by
+    that many bytes of UTF-8 encoded character data.
+    """
+    datum = datum.encode("utf-8")
+    self.write_bytes(datum)
+
+  def write_crc32(self, bytes):
+    """
+    A 4-byte, big-endian CRC32 checksum
+    """
+    self.write(STRUCT_CRC32.pack(binascii.crc32(bytes) & 0xffffffff));
+
+
+# ------------------------------------------------------------------------------
+# DatumReader/Writer
+
+
+class DatumReader(object):
+  """Deserialize Avro-encoded data into a Python data structure."""
+  @staticmethod
+  def check_props(schema_one, schema_two, prop_list):
+    for prop in prop_list:
+      if getattr(schema_one, prop) != getattr(schema_two, prop):
+        return False
+    return True
+
+  @staticmethod
+  def match_schemas(writer_schema, reader_schema):
+    w_type = writer_schema.type
+    r_type = reader_schema.type
+    if 'union' in [w_type, r_type] or 'error_union' in [w_type, r_type]:
+      return True
+    elif (w_type in schema.PRIMITIVE_TYPES and r_type in schema.PRIMITIVE_TYPES
+          and w_type == r_type):
+      return True
+    elif (w_type == r_type == 'record' and
+          DatumReader.check_props(writer_schema, reader_schema,
+                                  ['fullname'])):
+      return True
+    elif (w_type == r_type == 'error' and
+          DatumReader.check_props(writer_schema, reader_schema,
+                                  ['fullname'])):
+      return True
+    elif (w_type == r_type == 'request'):
+      return True
+    elif (w_type == r_type == 'fixed' and
+          DatumReader.check_props(writer_schema, reader_schema,
+                                  ['fullname', 'size'])):
+      return True
+    elif (w_type == r_type == 'enum' and
+          DatumReader.check_props(writer_schema, reader_schema,
+                                  ['fullname'])):
+      return True
+    elif (w_type == r_type == 'map' and
+          DatumReader.check_props(writer_schema.values,
+                                  reader_schema.values, ['type'])):
+      return True
+    elif (w_type == r_type == 'array' and
+          DatumReader.check_props(writer_schema.items,
+                                  reader_schema.items, ['type'])):
+      return True
+
+    # Handle schema promotion
+    if w_type == 'int' and r_type in ['long', 'float', 'double']:
+      return True
+    elif w_type == 'long' and r_type in ['float', 'double']:
+      return True
+    elif w_type == 'float' and r_type == 'double':
+      return True
+    return False
+
+  def __init__(self, writer_schema=None, reader_schema=None):
+    """
+    As defined in the Avro specification, we call the schema encoded
+    in the data the "writer's schema", and the schema expected by the
+    reader the "reader's schema".
+    """
+    self._writer_schema = writer_schema
+    self._reader_schema = reader_schema
+
+  # read/write properties
+  def set_writer_schema(self, writer_schema):
+    self._writer_schema = writer_schema
+  writer_schema = property(lambda self: self._writer_schema,
+                            set_writer_schema)
+  def set_reader_schema(self, reader_schema):
+    self._reader_schema = reader_schema
+  reader_schema = property(lambda self: self._reader_schema,
+                            set_reader_schema)
+
+  def read(self, decoder):
+    if self.reader_schema is None:
+      self.reader_schema = self.writer_schema
+    return self.read_data(self.writer_schema, self.reader_schema, decoder)
+
+  def read_data(self, writer_schema, reader_schema, decoder):
+    # schema matching
+    if not DatumReader.match_schemas(writer_schema, reader_schema):
+      fail_msg = 'Schemas do not match.'
+      raise SchemaResolutionException(fail_msg, writer_schema, reader_schema)
+
+    # schema resolution: reader's schema is a union, writer's schema is not
+    if (writer_schema.type not in ['union', 'error_union']
+        and reader_schema.type in ['union', 'error_union']):
+      for s in reader_schema.schemas:
+        if DatumReader.match_schemas(writer_schema, s):
+          return self.read_data(writer_schema, s, decoder)
+      fail_msg = 'Schemas do not match.'
+      raise SchemaResolutionException(fail_msg, writer_schema, reader_schema)
+
+    # function dispatch for reading data based on type of writer's schema
+    if writer_schema.type == 'null':
+      return decoder.read_null()
+    elif writer_schema.type == 'boolean':
+      return decoder.read_boolean()
+    elif writer_schema.type == 'string':
+      return decoder.read_utf8()
+    elif writer_schema.type == 'int':
+      return decoder.read_int()
+    elif writer_schema.type == 'long':
+      return decoder.read_long()
+    elif writer_schema.type == 'float':
+      return decoder.read_float()
+    elif writer_schema.type == 'double':
+      return decoder.read_double()
+    elif writer_schema.type == 'bytes':
+      return decoder.read_bytes()
+    elif writer_schema.type == 'fixed':
+      return self.read_fixed(writer_schema, reader_schema, decoder)
+    elif writer_schema.type == 'enum':
+      return self.read_enum(writer_schema, reader_schema, decoder)
+    elif writer_schema.type == 'array':
+      return self.read_array(writer_schema, reader_schema, decoder)
+    elif writer_schema.type == 'map':
+      return self.read_map(writer_schema, reader_schema, decoder)
+    elif writer_schema.type in ['union', 'error_union']:
+      return self.read_union(writer_schema, reader_schema, decoder)
+    elif writer_schema.type in ['record', 'error', 'request']:
+      return self.read_record(writer_schema, reader_schema, decoder)
+    else:
+      fail_msg = "Cannot read unknown schema type: %s" % writer_schema.type
+      raise schema.AvroException(fail_msg)
+
+  def skip_data(self, writer_schema, decoder):
+    if writer_schema.type == 'null':
+      return decoder.skip_null()
+    elif writer_schema.type == 'boolean':
+      return decoder.skip_boolean()
+    elif writer_schema.type == 'string':
+      return decoder.skip_utf8()
+    elif writer_schema.type == 'int':
+      return decoder.skip_int()
+    elif writer_schema.type == 'long':
+      return decoder.skip_long()
+    elif writer_schema.type == 'float':
+      return decoder.skip_float()
+    elif writer_schema.type == 'double':
+      return decoder.skip_double()
+    elif writer_schema.type == 'bytes':
+      return decoder.skip_bytes()
+    elif writer_schema.type == 'fixed':
+      return self.skip_fixed(writer_schema, decoder)
+    elif writer_schema.type == 'enum':
+      return self.skip_enum(writer_schema, decoder)
+    elif writer_schema.type == 'array':
+      return self.skip_array(writer_schema, decoder)
+    elif writer_schema.type == 'map':
+      return self.skip_map(writer_schema, decoder)
+    elif writer_schema.type in ['union', 'error_union']:
+      return self.skip_union(writer_schema, decoder)
+    elif writer_schema.type in ['record', 'error', 'request']:
+      return self.skip_record(writer_schema, decoder)
+    else:
+      fail_msg = "Unknown schema type: %s" % writer_schema.type
+      raise schema.AvroException(fail_msg)
+
+  def read_fixed(self, writer_schema, reader_schema, decoder):
+    """
+    Fixed instances are encoded using the number of bytes declared
+    in the schema.
+    """
+    return decoder.read(writer_schema.size)
+
+  def skip_fixed(self, writer_schema, decoder):
+    return decoder.skip(writer_schema.size)
+
+  def read_enum(self, writer_schema, reader_schema, decoder):
+    """
+    An enum is encoded by a int, representing the zero-based position
+    of the symbol in the schema.
+    """
+    # read data
+    index_of_symbol = decoder.read_int()
+    if index_of_symbol >= len(writer_schema.symbols):
+      fail_msg = "Can't access enum index %d for enum with %d symbols"\
+                 % (index_of_symbol, len(writer_schema.symbols))
+      raise SchemaResolutionException(fail_msg, writer_schema, reader_schema)
+    read_symbol = writer_schema.symbols[index_of_symbol]
+
+    # schema resolution
+    if read_symbol not in reader_schema.symbols:
+      fail_msg = "Symbol %s not present in Reader's Schema" % read_symbol
+      raise SchemaResolutionException(fail_msg, writer_schema, reader_schema)
+
+    return read_symbol
+
+  def skip_enum(self, writer_schema, decoder):
+    return decoder.skip_int()
+
+  def read_array(self, writer_schema, reader_schema, decoder):
+    """
+    Arrays are encoded as a series of blocks.
+
+    Each block consists of a long count value,
+    followed by that many array items.
+    A block with count zero indicates the end of the array.
+    Each item is encoded per the array's item schema.
+
+    If a block's count is negative,
+    then the count is followed immediately by a long block size,
+    indicating the number of bytes in the block.
+    The actual count in this case
+    is the absolute value of the count written.
+    """
+    read_items = []
+    block_count = decoder.read_long()
+    while block_count != 0:
+      if block_count < 0:
+        block_count = -block_count
+        block_size = decoder.read_long()
+      for i in range(block_count):
+        read_items.append(self.read_data(writer_schema.items,
+                                         reader_schema.items, decoder))
+      block_count = decoder.read_long()
+    return read_items
+
+  def skip_array(self, writer_schema, decoder):
+    block_count = decoder.read_long()
+    while block_count != 0:
+      if block_count < 0:
+        block_size = decoder.read_long()
+        decoder.skip(block_size)
+      else:
+        for i in range(block_count):
+          self.skip_data(writer_schema.items, decoder)
+      block_count = decoder.read_long()
+
+  def read_map(self, writer_schema, reader_schema, decoder):
+    """
+    Maps are encoded as a series of blocks.
+
+    Each block consists of a long count value,
+    followed by that many key/value pairs.
+    A block with count zero indicates the end of the map.
+    Each item is encoded per the map's value schema.
+
+    If a block's count is negative,
+    then the count is followed immediately by a long block size,
+    indicating the number of bytes in the block.
+    The actual count in this case
+    is the absolute value of the count written.
+    """
+    read_items = {}
+    block_count = decoder.read_long()
+    while block_count != 0:
+      if block_count < 0:
+        block_count = -block_count
+        block_size = decoder.read_long()
+      for i in range(block_count):
+        key = decoder.read_utf8()
+        read_items[key] = self.read_data(writer_schema.values,
+                                         reader_schema.values, decoder)
+      block_count = decoder.read_long()
+    return read_items
+
+  def skip_map(self, writer_schema, decoder):
+    block_count = decoder.read_long()
+    while block_count != 0:
+      if block_count < 0:
+        block_size = decoder.read_long()
+        decoder.skip(block_size)
+      else:
+        for i in range(block_count):
+          decoder.skip_utf8()
+          self.skip_data(writer_schema.values, decoder)
+      block_count = decoder.read_long()
+
+  def read_union(self, writer_schema, reader_schema, decoder):
+    """
+    A union is encoded by first writing a long value indicating
+    the zero-based position within the union of the schema of its value.
+    The value is then encoded per the indicated schema within the union.
+    """
+    # schema resolution
+    index_of_schema = int(decoder.read_long())
+    if index_of_schema >= len(writer_schema.schemas):
+      fail_msg = "Can't access branch index %d for union with %d branches"\
+                 % (index_of_schema, len(writer_schema.schemas))
+      raise SchemaResolutionException(fail_msg, writer_schema, reader_schema)
+    selected_writer_schema = writer_schema.schemas[index_of_schema]
+
+    # read data
+    return self.read_data(selected_writer_schema, reader_schema, decoder)
+
+  def skip_union(self, writer_schema, decoder):
+    index_of_schema = int(decoder.read_long())
+    if index_of_schema >= len(writer_schema.schemas):
+      fail_msg = "Can't access branch index %d for union with %d branches"\
+                 % (index_of_schema, len(writer_schema.schemas))
+      raise SchemaResolutionException(fail_msg, writer_schema)
+    return self.skip_data(writer_schema.schemas[index_of_schema], decoder)
+
+  def read_record(self, writer_schema, reader_schema, decoder):
+    """
+    A record is encoded by encoding the values of its fields
+    in the order that they are declared. In other words, a record
+    is encoded as just the concatenation of the encodings of its fields.
+    Field values are encoded per their schema.
+
+    Schema Resolution:
+     * the ordering of fields may be different: fields are matched by name.
+     * schemas for fields with the same name in both records are resolved
+       recursively.
+     * if the writer's record contains a field with a name not present in the
+       reader's record, the writer's value for that field is ignored.
+     * if the reader's record schema has a field that contains a default value,
+       and writer's schema does not have a field with the same name, then the
+       reader should use the default value from its field.
+     * if the reader's record schema has a field with no default value, and
+       writer's schema does not have a field with the same name, then the
+       field's value is unset.
+    """
+    # schema resolution
+    readers_fields_dict = reader_schema.field_map
+    read_record = {}
+    for field in writer_schema.fields:
+      readers_field = readers_fields_dict.get(field.name)
+      if readers_field is not None:
+        field_val = self.read_data(field.type, readers_field.type, decoder)
+        read_record[field.name] = field_val
+      else:
+        self.skip_data(field.type, decoder)
+
+    # fill in default values
+    if len(readers_fields_dict) > len(read_record):
+      writers_fields_dict = writer_schema.field_map
+      for field_name, field in readers_fields_dict.items():
+        if field_name not in writers_fields_dict:
+          if field.has_default:
+            field_val = self._read_default_value(field.type, field.default)
+            read_record[field.name] = field_val
+          else:
+            fail_msg = 'No default value for field %s' % field_name
+            raise SchemaResolutionException(fail_msg, writer_schema,
+                                            reader_schema)
+    return read_record
+
+  def skip_record(self, writer_schema, decoder):
+    for field in writer_schema.fields:
+      self.skip_data(field.type, decoder)
+
+  def _read_default_value(self, field_schema, default_value):
+    """
+    Basically a JSON Decoder?
+    """
+    if field_schema.type == 'null':
+      return None
+    elif field_schema.type == 'boolean':
+      return bool(default_value)
+    elif field_schema.type == 'int':
+      return int(default_value)
+    elif field_schema.type == 'long':
+      return int(default_value)
+    elif field_schema.type in ['float', 'double']:
+      return float(default_value)
+    elif field_schema.type in ['enum', 'fixed', 'string', 'bytes']:
+      return default_value
+    elif field_schema.type == 'array':
+      read_array = []
+      for json_val in default_value:
+        item_val = self._read_default_value(field_schema.items, json_val)
+        read_array.append(item_val)
+      return read_array
+    elif field_schema.type == 'map':
+      read_map = {}
+      for key, json_val in default_value.items():
+        map_val = self._read_default_value(field_schema.values, json_val)
+        read_map[key] = map_val
+      return read_map
+    elif field_schema.type in ['union', 'error_union']:
+      return self._read_default_value(field_schema.schemas[0], default_value)
+    elif field_schema.type == 'record':
+      read_record = {}
+      for field in field_schema.fields:
+        json_val = default_value.get(field.name)
+        if json_val is None: json_val = field.default
+        field_val = self._read_default_value(field.type, json_val)
+        read_record[field.name] = field_val
+      return read_record
+    else:
+      fail_msg = 'Unknown type: %s' % field_schema.type
+      raise schema.AvroException(fail_msg)
+
+
+# ------------------------------------------------------------------------------
+
+
+class DatumWriter(object):
+  """DatumWriter for generic python objects."""
+  def __init__(self, writer_schema=None):
+    self._writer_schema = writer_schema
+
+  # read/write properties
+  def set_writer_schema(self, writer_schema):
+    self._writer_schema = writer_schema
+  writer_schema = property(lambda self: self._writer_schema,
+                            set_writer_schema)
+
+  def write(self, datum, encoder):
+    # validate datum
+    if not Validate(self.writer_schema, datum):
+      raise AvroTypeException(self.writer_schema, datum)
+
+    self.write_data(self.writer_schema, datum, encoder)
+
+  def write_data(self, writer_schema, datum, encoder):
+    # function dispatch to write datum
+    if writer_schema.type == 'null':
+      encoder.write_null(datum)
+    elif writer_schema.type == 'boolean':
+      encoder.write_boolean(datum)
+    elif writer_schema.type == 'string':
+      encoder.write_utf8(datum)
+    elif writer_schema.type == 'int':
+      encoder.write_int(datum)
+    elif writer_schema.type == 'long':
+      encoder.write_long(datum)
+    elif writer_schema.type == 'float':
+      encoder.write_float(datum)
+    elif writer_schema.type == 'double':
+      encoder.write_double(datum)
+    elif writer_schema.type == 'bytes':
+      encoder.write_bytes(datum)
+    elif writer_schema.type == 'fixed':
+      self.write_fixed(writer_schema, datum, encoder)
+    elif writer_schema.type == 'enum':
+      self.write_enum(writer_schema, datum, encoder)
+    elif writer_schema.type == 'array':
+      self.write_array(writer_schema, datum, encoder)
+    elif writer_schema.type == 'map':
+      self.write_map(writer_schema, datum, encoder)
+    elif writer_schema.type in ['union', 'error_union']:
+      self.write_union(writer_schema, datum, encoder)
+    elif writer_schema.type in ['record', 'error', 'request']:
+      self.write_record(writer_schema, datum, encoder)
+    else:
+      fail_msg = 'Unknown type: %s' % writer_schema.type
+      raise schema.AvroException(fail_msg)
+
+  def write_fixed(self, writer_schema, datum, encoder):
+    """
+    Fixed instances are encoded using the number of bytes declared
+    in the schema.
+    """
+    encoder.write(datum)
+
+  def write_enum(self, writer_schema, datum, encoder):
+    """
+    An enum is encoded by a int, representing the zero-based position
+    of the symbol in the schema.
+    """
+    index_of_datum = writer_schema.symbols.index(datum)
+    encoder.write_int(index_of_datum)
+
+  def write_array(self, writer_schema, datum, encoder):
+    """
+    Arrays are encoded as a series of blocks.
+
+    Each block consists of a long count value,
+    followed by that many array items.
+    A block with count zero indicates the end of the array.
+    Each item is encoded per the array's item schema.
+
+    If a block's count is negative,
+    then the count is followed immediately by a long block size,
+    indicating the number of bytes in the block.
+    The actual count in this case
+    is the absolute value of the count written.
+    """
+    if len(datum) > 0:
+      encoder.write_long(len(datum))
+      for item in datum:
+        self.write_data(writer_schema.items, item, encoder)
+    encoder.write_long(0)
+
+  def write_map(self, writer_schema, datum, encoder):
+    """
+    Maps are encoded as a series of blocks.
+
+    Each block consists of a long count value,
+    followed by that many key/value pairs.
+    A block with count zero indicates the end of the map.
+    Each item is encoded per the map's value schema.
+
+    If a block's count is negative,
+    then the count is followed immediately by a long block size,
+    indicating the number of bytes in the block.
+    The actual count in this case
+    is the absolute value of the count written.
+    """
+    if len(datum) > 0:
+      encoder.write_long(len(datum))
+      for key, val in datum.items():
+        encoder.write_utf8(key)
+        self.write_data(writer_schema.values, val, encoder)
+    encoder.write_long(0)
+
+  def write_union(self, writer_schema, datum, encoder):
+    """
+    A union is encoded by first writing a long value indicating
+    the zero-based position within the union of the schema of its value.
+    The value is then encoded per the indicated schema within the union.
+    """
+    # resolve union
+    index_of_schema = -1
+    for i, candidate_schema in enumerate(writer_schema.schemas):
+      if Validate(candidate_schema, datum):
+        index_of_schema = i
+    if index_of_schema < 0: raise AvroTypeException(writer_schema, datum)
+
+    # write data
+    encoder.write_long(index_of_schema)
+    self.write_data(writer_schema.schemas[index_of_schema], datum, encoder)
+
+  def write_record(self, writer_schema, datum, encoder):
+    """
+    A record is encoded by encoding the values of its fields
+    in the order that they are declared. In other words, a record
+    is encoded as just the concatenation of the encodings of its fields.
+    Field values are encoded per their schema.
+    """
+    for field in writer_schema.fields:
+      self.write_data(field.type, datum.get(field.name), encoder)
+
+
+if __name__ == '__main__':
+  raise Exception('Not a standalone module')

Propchange: avro/trunk/lang/py3/avro/io.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/ipc.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/ipc.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/ipc.py (added)
+++ avro/trunk/lang/py3/avro/ipc.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,694 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""RPC/IPC support."""
+
+import abc
+import http.client
+import http.server
+import io
+import logging
+import os
+import socketserver
+
+from avro import io as avro_io
+from avro import protocol
+from avro import schema
+
+# ------------------------------------------------------------------------------
+# Constants
+
+def LoadResource(name):
+  dir_path = os.path.dirname(__file__)
+  rsrc_path = os.path.join(dir_path, name)
+  with open(rsrc_path, 'r') as f:
+    return f.read()
+
+
+# Handshake schema is pulled in during build
+HANDSHAKE_REQUEST_SCHEMA_JSON = LoadResource('HandshakeRequest.avsc')
+HANDSHAKE_RESPONSE_SCHEMA_JSON = LoadResource('HandshakeResponse.avsc')
+
+HANDSHAKE_REQUEST_SCHEMA = schema.Parse(HANDSHAKE_REQUEST_SCHEMA_JSON)
+HANDSHAKE_RESPONSE_SCHEMA = schema.Parse(HANDSHAKE_RESPONSE_SCHEMA_JSON)
+
+HANDSHAKE_REQUESTOR_WRITER = avro_io.DatumWriter(HANDSHAKE_REQUEST_SCHEMA)
+HANDSHAKE_REQUESTOR_READER = avro_io.DatumReader(HANDSHAKE_RESPONSE_SCHEMA)
+HANDSHAKE_RESPONDER_WRITER = avro_io.DatumWriter(HANDSHAKE_RESPONSE_SCHEMA)
+HANDSHAKE_RESPONDER_READER = avro_io.DatumReader(HANDSHAKE_REQUEST_SCHEMA)
+
+META_SCHEMA = schema.Parse('{"type": "map", "values": "bytes"}')
+META_WRITER = avro_io.DatumWriter(META_SCHEMA)
+META_READER = avro_io.DatumReader(META_SCHEMA)
+
+SYSTEM_ERROR_SCHEMA = schema.Parse('["string"]')
+
+AVRO_RPC_MIME = 'avro/binary'
+
+# protocol cache
+
+# Map: remote name -> remote MD5 hash
+_REMOTE_HASHES = {}
+
+# Decoder/encoder for a 32 bits big-endian integer.
+UINT32_BE = avro_io.STRUCT_INT
+
+# Default size of the buffers use to frame messages:
+BUFFER_SIZE = 8192
+
+
+# ------------------------------------------------------------------------------
+# Exceptions
+
+
+class AvroRemoteException(schema.AvroException):
+  """
+  Raised when an error message is sent by an Avro requestor or responder.
+  """
+  def __init__(self, fail_msg=None):
+    schema.AvroException.__init__(self, fail_msg)
+
+class ConnectionClosedException(schema.AvroException):
+  pass
+
+
+# ------------------------------------------------------------------------------
+# Base IPC Classes (Requestor/Responder)
+
+
+class BaseRequestor(object, metaclass=abc.ABCMeta):
+  """Base class for the client side of a protocol interaction."""
+
+  def __init__(self, local_protocol, transceiver):
+    """Initializes a new requestor object.
+
+    Args:
+      local_protocol: Avro Protocol describing the messages sent and received.
+      transceiver: Transceiver instance to channel messages through.
+    """
+    self._local_protocol = local_protocol
+    self._transceiver = transceiver
+    self._remote_protocol = None
+    self._remote_hash = None
+    self._send_protocol = None
+
+  @property
+  def local_protocol(self):
+    """Returns: the Avro Protocol describing the messages sent and received."""
+    return self._local_protocol
+
+  @property
+  def transceiver(self):
+    """Returns: the underlying channel used by this requestor."""
+    return self._transceiver
+
+  @abc.abstractmethod
+  def _IssueRequest(self, call_request, message_name, request_datum):
+    """TODO: Document this method.
+
+    Args:
+      call_request: ???
+      message_name: Name of the message.
+      request_datum: ???
+    Returns:
+      ???
+    """
+    raise Error('Abstract method')
+
+  def Request(self, message_name, request_datum):
+    """Writes a request message and reads a response or error message.
+
+    Args:
+      message_name: Name of the IPC method.
+      request_datum: IPC request.
+    Returns:
+      The IPC response.
+    """
+    # build handshake and call request
+    buffer_writer = io.BytesIO()
+    buffer_encoder = avro_io.BinaryEncoder(buffer_writer)
+    self._WriteHandshakeRequest(buffer_encoder)
+    self._WriteCallRequest(message_name, request_datum, buffer_encoder)
+
+    # send the handshake and call request; block until call response
+    call_request = buffer_writer.getvalue()
+    return self._IssueRequest(call_request, message_name, request_datum)
+
+  def _WriteHandshakeRequest(self, encoder):
+    """Emits the handshake request.
+
+    Args:
+      encoder: Encoder to write the handshake request into.
+    """
+    local_hash = self._local_protocol.md5
+
+    # if self._remote_hash is None:
+    #   remote_name = self.transceiver.remote_name
+    #   self._remote_hash = _REMOTE_HASHES.get(remote_name)
+
+    if self._remote_hash is None:
+      self._remote_hash = local_hash
+      self._remote_protocol = self._local_protocol
+
+    request_datum = {
+      'clientHash': local_hash,
+      'serverHash': self._remote_hash,
+    }
+    if self._send_protocol:
+      request_datum['clientProtocol'] = str(self._local_protocol)
+
+    logging.info('Sending handshake request: %s', request_datum)
+    HANDSHAKE_REQUESTOR_WRITER.write(request_datum, encoder)
+
+  def _WriteCallRequest(self, message_name, request_datum, encoder):
+    """
+    The format of a call request is:
+      * request metadata, a map with values of type bytes
+      * the message name, an Avro string, followed by
+      * the message parameters. Parameters are serialized according to
+        the message's request declaration.
+    """
+    # request metadata (not yet implemented)
+    request_metadata = {}
+    META_WRITER.write(request_metadata, encoder)
+
+    # Identify message to send:
+    message = self.local_protocol.message_map.get(message_name)
+    if message is None:
+      raise schema.AvroException('Unknown message: %s' % message_name)
+    encoder.write_utf8(message.name)
+
+    # message parameters
+    self._WriteRequest(message.request, request_datum, encoder)
+
+  def _WriteRequest(self, request_schema, request_datum, encoder):
+    logging.info('writing request: %s', request_datum)
+    datum_writer = avro_io.DatumWriter(request_schema)
+    datum_writer.write(request_datum, encoder)
+
+  def _ReadHandshakeResponse(self, decoder):
+    """Reads and processes the handshake response message.
+
+    Args:
+      decoder: Decoder to read messages from.
+    Returns:
+      call-response exists (boolean) ???
+    Raises:
+      schema.AvroException on ???
+    """
+    handshake_response = HANDSHAKE_REQUESTOR_READER.read(decoder)
+    logging.info('Processing handshake response: %s', handshake_response)
+    match = handshake_response['match']
+    if match == 'BOTH':
+      # Both client and server protocol hashes match:
+      self._send_protocol = False
+      return True
+
+    elif match == 'CLIENT':
+      # Client's side hash mismatch:
+      self._remote_protocol = \
+          protocol.Parse(handshake_response['serverProtocol'])
+      self._remote_hash = handshake_response['serverHash']
+      self._send_protocol = False
+      return True
+
+    elif match == 'NONE':
+      # Neither client nor server match:
+      self._remote_protocol = \
+          protocol.Parse(handshake_response['serverProtocol'])
+      self._remote_hash = handshake_response['serverHash']
+      self._send_protocol = True
+      return False
+    else:
+      raise schema.AvroException('handshake_response.match=%r' % match)
+
+  def _ReadCallResponse(self, message_name, decoder):
+    """Reads and processes a method call response.
+
+    The format of a call response is:
+      - response metadata, a map with values of type bytes
+      - a one-byte error flag boolean, followed by either:
+        - if the error flag is false,
+          the message response, serialized per the message's response schema.
+        - if the error flag is true,
+          the error, serialized per the message's error union schema.
+
+    Args:
+      message_name:
+      decoder:
+    Returns:
+      ???
+    Raises:
+      schema.AvroException on ???
+    """
+    # response metadata
+    response_metadata = META_READER.read(decoder)
+
+    # remote response schema
+    remote_message_schema = self._remote_protocol.message_map.get(message_name)
+    if remote_message_schema is None:
+      raise schema.AvroException('Unknown remote message: %s' % message_name)
+
+    # local response schema
+    local_message_schema = self._local_protocol.message_map.get(message_name)
+    if local_message_schema is None:
+      raise schema.AvroException('Unknown local message: %s' % message_name)
+
+    # error flag
+    if not decoder.read_boolean():
+      writer_schema = remote_message_schema.response
+      reader_schema = local_message_schema.response
+      return self._ReadResponse(writer_schema, reader_schema, decoder)
+    else:
+      writer_schema = remote_message_schema.errors
+      reader_schema = local_message_schema.errors
+      raise self._ReadError(writer_schema, reader_schema, decoder)
+
+  def _ReadResponse(self, writer_schema, reader_schema, decoder):
+    datum_reader = avro_io.DatumReader(writer_schema, reader_schema)
+    result = datum_reader.read(decoder)
+    return result
+
+  def _ReadError(self, writer_schema, reader_schema, decoder):
+    datum_reader = avro_io.DatumReader(writer_schema, reader_schema)
+    return AvroRemoteException(datum_reader.read(decoder))
+
+
+class Requestor(BaseRequestor):
+  """Concrete requestor implementation."""
+
+  def _IssueRequest(self, call_request, message_name, request_datum):
+    call_response = self.transceiver.Transceive(call_request)
+
+    # process the handshake and call response
+    buffer_decoder = avro_io.BinaryDecoder(io.BytesIO(call_response))
+    call_response_exists = self._ReadHandshakeResponse(buffer_decoder)
+    if call_response_exists:
+      return self._ReadCallResponse(message_name, buffer_decoder)
+    else:
+      return self.Request(message_name, request_datum)
+
+
+# ------------------------------------------------------------------------------
+
+
+class Responder(object, metaclass=abc.ABCMeta):
+  """Base class for the server side of a protocol interaction."""
+
+  def __init__(self, local_protocol):
+    self._local_protocol = local_protocol
+    self._local_hash = self._local_protocol.md5
+    self._protocol_cache = {}
+
+    self.set_protocol_cache(self._local_hash, self._local_protocol)
+
+  @property
+  def local_protocol(self):
+    return self._local_protocol
+
+  # utility functions to manipulate protocol cache
+  def get_protocol_cache(self, hash):
+    return self._protocol_cache.get(hash)
+
+  def set_protocol_cache(self, hash, protocol):
+    self._protocol_cache[hash] = protocol
+
+  def Respond(self, call_request):
+    """Entry point to process one procedure call.
+
+    Args:
+      call_request: Serialized procedure call request.
+    Returns:
+      Serialized procedure call response.
+    Raises:
+      ???
+    """
+    buffer_reader = io.BytesIO(call_request)
+    buffer_decoder = avro_io.BinaryDecoder(buffer_reader)
+    buffer_writer = io.BytesIO()
+    buffer_encoder = avro_io.BinaryEncoder(buffer_writer)
+    error = None
+    response_metadata = {}
+
+    try:
+      remote_protocol = self._ProcessHandshake(buffer_decoder, buffer_encoder)
+      # handshake failure
+      if remote_protocol is None:
+        return buffer_writer.getvalue()
+
+      # read request using remote protocol
+      request_metadata = META_READER.read(buffer_decoder)
+      remote_message_name = buffer_decoder.read_utf8()
+
+      # get remote and local request schemas so we can do
+      # schema resolution (one fine day)
+      remote_message = remote_protocol.message_map.get(remote_message_name)
+      if remote_message is None:
+        fail_msg = 'Unknown remote message: %s' % remote_message_name
+        raise schema.AvroException(fail_msg)
+      local_message = self.local_protocol.message_map.get(remote_message_name)
+      if local_message is None:
+        fail_msg = 'Unknown local message: %s' % remote_message_name
+        raise schema.AvroException(fail_msg)
+      writer_schema = remote_message.request
+      reader_schema = local_message.request
+      request = self._ReadRequest(writer_schema, reader_schema, buffer_decoder)
+      logging.info('Processing request: %r', request)
+
+      # perform server logic
+      try:
+        response = self.Invoke(local_message, request)
+      except AvroRemoteException as exn:
+        error = exn
+      except Exception as exn:
+        error = AvroRemoteException(str(exn))
+
+      # write response using local protocol
+      META_WRITER.write(response_metadata, buffer_encoder)
+      buffer_encoder.write_boolean(error is not None)
+      if error is None:
+        writer_schema = local_message.response
+        self._WriteResponse(writer_schema, response, buffer_encoder)
+      else:
+        writer_schema = local_message.errors
+        self._WriteError(writer_schema, error, buffer_encoder)
+    except schema.AvroException as exn:
+      error = AvroRemoteException(str(exn))
+      buffer_encoder = avro_io.BinaryEncoder(io.StringIO())
+      META_WRITER.write(response_metadata, buffer_encoder)
+      buffer_encoder.write_boolean(True)
+      self._WriteError(SYSTEM_ERROR_SCHEMA, error, buffer_encoder)
+    return buffer_writer.getvalue()
+
+  def _ProcessHandshake(self, decoder, encoder):
+    """Processes an RPC handshake.
+
+    Args:
+      decoder: Where to read from.
+      encoder: Where to write to.
+    Returns:
+      The requested Protocol.
+    """
+    handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder)
+    logging.info('Processing handshake request: %s', handshake_request)
+
+    # determine the remote protocol
+    client_hash = handshake_request.get('clientHash')
+    client_protocol = handshake_request.get('clientProtocol')
+    remote_protocol = self.get_protocol_cache(client_hash)
+    if remote_protocol is None and client_protocol is not None:
+      remote_protocol = protocol.Parse(client_protocol)
+      self.set_protocol_cache(client_hash, remote_protocol)
+
+    # evaluate remote's guess of the local protocol
+    server_hash = handshake_request.get('serverHash')
+
+    handshake_response = {}
+    if self._local_hash == server_hash:
+      if remote_protocol is None:
+        handshake_response['match'] = 'NONE'
+      else:
+        handshake_response['match'] = 'BOTH'
+    else:
+      if remote_protocol is None:
+        handshake_response['match'] = 'NONE'
+      else:
+        handshake_response['match'] = 'CLIENT'
+
+    if handshake_response['match'] != 'BOTH':
+      handshake_response['serverProtocol'] = str(self.local_protocol)
+      handshake_response['serverHash'] = self._local_hash
+
+    logging.info('Handshake response: %s', handshake_response)
+    HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder)
+    return remote_protocol
+
+  @abc.abstractmethod
+  def Invoke(self, local_message, request):
+    """Processes one procedure call.
+
+    Args:
+      local_message: Avro message specification.
+      request: Call request.
+    Returns:
+      Call response.
+    Raises:
+      ???
+    """
+    raise Error('abtract method')
+
+  def _ReadRequest(self, writer_schema, reader_schema, decoder):
+    datum_reader = avro_io.DatumReader(writer_schema, reader_schema)
+    return datum_reader.read(decoder)
+
+  def _WriteResponse(self, writer_schema, response_datum, encoder):
+    datum_writer = avro_io.DatumWriter(writer_schema)
+    datum_writer.write(response_datum, encoder)
+
+  def _WriteError(self, writer_schema, error_exception, encoder):
+    datum_writer = avro_io.DatumWriter(writer_schema)
+    datum_writer.write(str(error_exception), encoder)
+
+
+# ------------------------------------------------------------------------------
+# Framed message
+
+
+class FramedReader(object):
+  """Wrapper around a file-like object to read framed data."""
+
+  def __init__(self, reader):
+    self._reader = reader
+
+  def Read(self):
+    """Reads one message from the configured reader.
+
+    Returns:
+      The message, as bytes.
+    """
+    message = io.BytesIO()
+    # Read and append frames until we encounter a 0-size frame:
+    while self._ReadFrame(message) > 0: pass
+    return message.getvalue()
+
+  def _ReadFrame(self, message):
+    """Reads and appends one frame into the given message bytes.
+
+    Args:
+      message: Message to append the frame to.
+    Returns:
+      Size of the frame that was read.
+      The empty frame (size 0) indicates the end of a message.
+    """
+    frame_size = self._ReadInt32()
+    remaining = frame_size
+    while remaining > 0:
+      data_bytes = self._reader.read(remaining)
+      if len(data_bytes) == 0:
+        raise ConnectionClosedException(
+            'FramedReader: expecting %d more bytes in frame of size %d, got 0.'
+            % (remaining, frame_size))
+      message.write(data_bytes)
+      remaining -= len(data_bytes)
+    return frame_size
+
+  def _ReadInt32(self):
+    encoded = self._reader.read(UINT32_BE.size)
+    if len(encoded) != UINT32_BE.size:
+      raise ConnectionClosedException('Invalid header: %r' % encoded)
+    return UINT32_BE.unpack(encoded)[0]
+
+
+class FramedWriter(object):
+  """Wrapper around a file-like object to write framed data."""
+
+  def __init__(self, writer):
+    self._writer = writer
+
+  def Write(self, message):
+    """Writes a message.
+
+    Message is chunked into sequences of frames terminated by an empty frame.
+
+    Args:
+      message: Message to write, as bytes.
+    """
+    while len(message) > 0:
+      chunk_size = max(BUFFER_SIZE, len(message))
+      chunk = message[:chunk_size]
+      self._WriteBuffer(chunk)
+      message = message[chunk_size:]
+
+    # A message is always terminated by a zero-length buffer.
+    self._WriteUnsignedInt32(0)
+
+  def _WriteBuffer(self, chunk):
+    self._WriteUnsignedInt32(len(chunk))
+    self._writer.write(chunk)
+
+  def _WriteUnsignedInt32(self, uint32):
+    self._writer.write(UINT32_BE.pack(uint32))
+
+
+# ------------------------------------------------------------------------------
+# Transceiver (send/receive channel)
+
+
+class Transceiver(object, metaclass=abc.ABCMeta):
+  @abc.abstractproperty
+  def remote_name(self):
+    pass
+
+  @abc.abstractmethod
+  def ReadMessage(self):
+    """Reads a single message from the channel.
+
+    Blocks until a message can be read.
+
+    Returns:
+      The message read from the channel.
+    """
+    pass
+
+  @abc.abstractmethod
+  def WriteMessage(self, message):
+    """Writes a message into the channel.
+
+    Blocks until the message has been written.
+
+    Args:
+      message: Message to write.
+    """
+    pass
+
+  def Transceive(self, request):
+    """Processes a single request-reply interaction.
+
+    Synchronous request-reply interaction.
+
+    Args:
+      request: Request message.
+    Returns:
+      The reply message.
+    """
+    self.WriteMessage(request)
+    result = self.ReadMessage()
+    return result
+
+  def Close(self):
+    """Closes this transceiver."""
+    pass
+
+
+class HTTPTransceiver(Transceiver):
+  """HTTP-based transceiver implementation."""
+
+  def __init__(self, host, port, req_resource='/'):
+    """Initializes a new HTTP transceiver.
+
+    Args:
+      host: Name or IP address of the remote host to interact with.
+      port: Port the remote server is listening on.
+      req_resource: Optional HTTP resource path to use, '/' by default.
+    """
+    self._req_resource = req_resource
+    self._conn = http.client.HTTPConnection(host, port)
+    self._conn.connect()
+    self._remote_name = self._conn.sock.getsockname()
+
+  @property
+  def remote_name(self):
+    return self._remote_name
+
+  def ReadMessage(self):
+    response = self._conn.getresponse()
+    response_reader = FramedReader(response)
+    framed_message = response_reader.Read()
+    response.read()    # ensure we're ready for subsequent requests
+    return framed_message
+
+  def WriteMessage(self, message):
+    req_method = 'POST'
+    req_headers = {'Content-Type': AVRO_RPC_MIME}
+
+    bio = io.BytesIO()
+    req_body_buffer = FramedWriter(bio)
+    req_body_buffer.Write(message)
+    req_body = bio.getvalue()
+
+    self._conn.request(req_method, self._req_resource, req_body, req_headers)
+
+  def Close(self):
+    self._conn.close()
+    self._conn = None
+
+
+# ------------------------------------------------------------------------------
+# Server Implementations
+
+
+def _MakeHandlerClass(responder):
+  class AvroHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
+    def do_POST(self):
+      reader = FramedReader(self.rfile)
+      call_request = reader.Read()
+      logging.info('Serialized request: %r', call_request)
+      call_response = responder.Respond(call_request)
+      logging.info('Serialized response: %r', call_response)
+
+      self.send_response(200)
+      self.send_header('Content-type', AVRO_RPC_MIME)
+      self.end_headers()
+
+      framed_writer = FramedWriter(self.wfile)
+      framed_writer.Write(call_response)
+      self.wfile.flush()
+      logging.info('Response sent')
+
+  return AvroHTTPRequestHandler
+
+
+class MultiThreadedHTTPServer(
+    socketserver.ThreadingMixIn,
+    http.server.HTTPServer,
+):
+  """Multi-threaded HTTP server."""
+  pass
+
+
+class AvroIpcHttpServer(MultiThreadedHTTPServer):
+  """Avro IPC server implemented on top of an HTTP server."""
+
+  def __init__(self, interface, port, responder):
+    """Initializes a new Avro IPC server.
+
+    Args:
+      interface: Interface the server listens on, eg. 'localhost' or '0.0.0.0'.
+      port: TCP port the server listens on, eg. 8000.
+      responder: Responder implementation to handle RPCs.
+    """
+    super(AvroIpcHttpServer, self).__init__(
+        server_address=(interface, port),
+        RequestHandlerClass=_MakeHandlerClass(responder),
+    )
+
+
+if __name__ == '__main__':
+  raise Exception('Not a standalone module')

Propchange: avro/trunk/lang/py3/avro/ipc.py
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message