avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r895732 [1/2] - in /hadoop/avro/trunk: ./ lib/py/ src/ src/py/avro/ src/test/py/
Date Mon, 04 Jan 2010 18:09:43 GMT
Author: cutting
Date: Mon Jan  4 18:09:42 2010
New Revision: 895732

URL: http://svn.apache.org/viewvc?rev=895732&view=rev
Log:
Rework Python API.  Contributed by Jeff Hammerbacher.

Added:
    hadoop/avro/trunk/src/test/py/test_datafile.py
    hadoop/avro/trunk/src/test/py/test_io.py
    hadoop/avro/trunk/src/test/py/test_schema.py
Removed:
    hadoop/avro/trunk/lib/py/odict-LICENSE.txt
    hadoop/avro/trunk/lib/py/odict.py
    hadoop/avro/trunk/src/py/avro/genericio.py
    hadoop/avro/trunk/src/py/avro/genericipc.py
    hadoop/avro/trunk/src/py/avro/ipc.py
    hadoop/avro/trunk/src/py/avro/protocol.py
    hadoop/avro/trunk/src/py/avro/reflectio.py
    hadoop/avro/trunk/src/py/avro/reflectipc.py
    hadoop/avro/trunk/src/test/py/interoptests.py
    hadoop/avro/trunk/src/test/py/testimport.py
    hadoop/avro/trunk/src/test/py/testio.py
    hadoop/avro/trunk/src/test/py/testioreflect.py
    hadoop/avro/trunk/src/test/py/testipc.py
    hadoop/avro/trunk/src/test/py/testipcreflect.py
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/build.xml
    hadoop/avro/trunk/src/py/avro/__init__.py
    hadoop/avro/trunk/src/py/avro/datafile.py
    hadoop/avro/trunk/src/py/avro/io.py
    hadoop/avro/trunk/src/py/avro/schema.py
    hadoop/avro/trunk/src/rat-excludes.txt

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=895732&r1=895731&r2=895732&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Mon Jan  4 18:09:42 2010
@@ -164,6 +164,8 @@
     AVRO-269. Use java compiler to validate specific compiler's output.
     (Philip Zeyliger via cutting)
 
+    AVRO-219. Rework Python API.  (Jeff Hammerbacher via cutting)
+
   OPTIMIZATIONS
 
     AVRO-172. More efficient schema processing (massie)

Modified: hadoop/avro/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/build.xml?rev=895732&r1=895731&r2=895732&view=diff
==============================================================================
--- hadoop/avro/trunk/build.xml (original)
+++ hadoop/avro/trunk/build.xml Mon Jan  4 18:09:42 2010
@@ -381,7 +381,7 @@
     </copy>
   </target>
 
-  <target name="generate-test-data" depends="compile-test-java, init-py">
+  <target name="generate-test-data" depends="compile-test-java">
     <mkdir dir="${test.java.build.dir}/data-files"/>
   	 <mkdir dir="${test.java.build.dir}/blocking-data-files"/>
     <java classname="org.apache.avro.RandomData"
@@ -396,16 +396,6 @@
   	      <arg value="${test.java.build.dir}/blocking-data-files/test.java.blocking.avro"/>
   	      <arg value="${test.count}"/>
   	 </java>
-
-    <taskdef name="py-run" classname="org.pyant.tasks.PythonRunTask">
-      <classpath refid="java.classpath" />
-    </taskdef>
-    <py-run script="${basedir}/src/test/py/testio.py" python="python"
-      pythonpathref="test.py.path">
-      <arg value="${basedir}/src/test/schemata/interop.avsc"/>
-      <arg value="${test.java.build.dir}/data-files/test.py.avro"/>
-      <arg value="100"/>
-    </py-run>
   </target>
 
   <target name="test-py" depends="init-py" description="Run python unit tests">
@@ -414,12 +404,12 @@
     </taskdef>
     <py-test python="python" pythonpathref="test.py.path" >
       <fileset dir="${basedir}/src/test/py">
-        <include name="test*.py"/>
+        <include name="test_*.py"/>
       </fileset>
     </py-test>
   </target>
 
-  <target name="test-interop" depends="test-interop-java,test-interop-py,test-interop-c"
+  <target name="test-interop" depends="test-interop-java,test-interop-c"
    description="Run multiple languages interoperability tests">
   </target>
 
@@ -428,11 +418,6 @@
    description="Run java interoperability tests">
   </target>
 
-  <target name="test-interop-py" 
-    depends="test-interop-data-py,test-interop-rpc-py"
-   description="Run python interoperability tests">
-  </target>
-
   <target name="test-interop-c"
     depends="test-interop-data-c"
    description="Run C interoperability tests">
@@ -443,25 +428,10 @@
     <test-runner files.location="${test.java.classes}" tests.pattern="**/TestDataFile$InteropTest.class" />
   </target>
 
-  <target name="test-interop-data-py" depends="generate-test-data" 
-    description="Run python data file interoperability tests">
-    <taskdef name="py-test" classname="org.pyant.tasks.PythonTestTask">
-      <classpath refid="java.classpath" />
-    </taskdef>
-    <py-test python="python" pythonpathref="test.py.path" >
-      <fileset dir="${basedir}/src/test/py">
-        <include name="interop*.py"/>
-      </fileset>
-    </py-test>
-  </target>
-
-  <target name="start-rpc-daemons" depends="compile-test-java, init-py"
+  <target name="start-rpc-daemons" depends="compile-test-java"
     description="Start the daemons for rpc interoperability tests">
     <delete dir="${test.java.build.dir}/server-ports"/>
     <mkdir dir="${test.java.build.dir}/server-ports"/>
-    <taskdef name="py-run" classname="org.pyant.tasks.PythonRunTask">
-      <classpath refid="java.classpath" />
-    </taskdef>
     <!-- Start the servers. As servers block the ant main thread, these need 
     to be created in parallel threads--> 
     <parallel>
@@ -470,10 +440,6 @@
           <classpath refid="test.java.classpath"/>
           <sysproperty key="test.dir" value="${test.java.build.dir}"/>
         </java>
-        <py-run script="${basedir}/src/test/py/interoptests.py" 
-          python="python" pythonpathref="test.py.path">
-          <arg value="server"/>
-        </py-run>
       </daemons>
 
       <!-- Give some time to start -->
@@ -488,14 +454,6 @@
     <test-runner files.location="${test.java.classes}" tests.pattern="**/TestProtocolSpecific$InteropTest.class" />
   </target>
 
-  <target name="test-interop-rpc-py" depends="start-rpc-daemons"
-    description="Run java rpc interoperability tests">
-        <py-run script="${basedir}/src/test/py/interoptests.py" 
-          python="python" pythonpathref="test.py.path">
-          <arg value="client"/>
-        </py-run>
-  </target>
-
   <target name="avroj" depends="compile-java" description="Build standalone avroj jar file">
     <jar jarfile="${build.dir}/avroj-${version}.jar">
       <manifest>

Modified: hadoop/avro/trunk/src/py/avro/__init__.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/__init__.py?rev=895732&r1=895731&r2=895732&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/__init__.py (original)
+++ hadoop/avro/trunk/src/py/avro/__init__.py Mon Jan  4 18:09:42 2010
@@ -14,5 +14,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-__all__ = ['schema', 'protocol', 'io', 'ipc', 'genericio', 'genericipc',
-           'reflectio', 'reflectipc', 'datafile']
+__all__ = ['schema', 'io', 'datafile']
+

Modified: hadoop/avro/trunk/src/py/avro/datafile.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/datafile.py?rev=895732&r1=895731&r2=895732&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/datafile.py (original)
+++ hadoop/avro/trunk/src/py/avro/datafile.py Mon Jan  4 18:09:42 2010
@@ -13,178 +13,274 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+"""
+Read/Write Avro File Object Containers.
+"""
+import uuid
+import cStringIO
+from avro import schema
+from avro import io
+
+#
+# Constants
+#
+
+VERSION = 1
+MAGIC = 'Obj' + chr(VERSION)
+MAGIC_SIZE = len(MAGIC)
+SYNC_SIZE = 16
+SYNC_INTERVAL = 1000 * SYNC_SIZE # TODO(hammer): make configurable
+META_SCHEMA = schema.parse("""\
+{"type": "record", "name": "org.apache.avro.file.Header",
+ "fields" : [
+   {"name": "magic", "type": {"type": "fixed", "name": "magic", "size": %d}},
+   {"name": "meta", "type": {"type": "map", "values": "bytes"}},
+   {"name": "sync", "type": {"type": "fixed", "name": "sync", "size": %d}}]}
+""" % (MAGIC_SIZE, SYNC_SIZE))
+VALID_CODECS = ['null']
+VALID_ENCODINGS = ['binary'] # not used yet
+
+#
+# Exceptions
+#
+
+class DataFileException(schema.AvroException):
+  """
+  Raised when there's a problem reading or writing file object containers.
+  """
+  def __init__(self, fail_msg):
+    schema.AvroException.__init__(self, fail_msg)
+
+#
+# Write Path
+#
 
-"""Read/Write Avro Data Files"""
-
-import struct, uuid, cStringIO
-import avro.schema as schema
-import avro.io as io
-
-#Data file constants.
-_VERSION = 0
-_MAGIC = "Obj"+chr(_VERSION)
-_SYNC_SIZE = 16
-_SYNC_INTERVAL = 1000*_SYNC_SIZE
-_FOOTER_BLOCK = -1
 class DataFileWriter(object):
-  """Stores in a file a sequence of data conforming to a schema. The schema is 
-  stored in the file with the data. Each datum in a file is of the same
-  schema. Data is grouped into blocks.
-  A synchronization marker is written between blocks, so that
-  files may be split. Blocks may be compressed. Extensible metadata is
-  stored at the end of the file. Files may be appended to."""
-
-  def __init__(self, schm, writer, dwriter):
-    self.__writer = writer
-    self.__encoder = io.Encoder(writer)
-    self.__dwriter = dwriter
-    self.__dwriter.setschema(schm)
-    self.__count = 0  #entries in file
-    self.__blockcount = 0  #entries in current block
-    self.__buffer = cStringIO.StringIO()
-    self.__bufwriter = io.Encoder(self.__buffer)
-    self.__meta = dict()
-    self.__sync = uuid.uuid4().bytes
-    self.__meta["sync"] = self.__sync
-    self.__meta["codec"] = "null"
-    self.__meta["schema"] = schema.stringval(schm)
-    self.__writer.write(struct.pack(len(_MAGIC).__str__()+'s',
-                                    _MAGIC))
-
-  def setmeta(self, key, val):
-    """Set a meta data property."""
-    self.__meta[key] = val
+  @staticmethod
+  def generate_sync_marker():
+    return uuid.uuid4().bytes
+
+  # TODO(hammer): make 'encoder' a metadata property
+  def __init__(self, writer, datum_writer, writers_schema=None):
+    """
+    If the schema is not present, presume we're appending.
+    """
+    self._writer = writer
+    self._encoder = io.BinaryEncoder(writer)
+    self._datum_writer = datum_writer
+    self._buffer_writer = cStringIO.StringIO()
+    self._buffer_encoder = io.BinaryEncoder(self._buffer_writer)
+    self._block_count = 0
+    self._meta = {}
+
+    if writers_schema is not None:
+      self._sync_marker = DataFileWriter.generate_sync_marker()
+      self.set_meta('codec', 'null')
+      self.set_meta('schema', str(writers_schema))
+      self.datum_writer.writers_schema = writers_schema
+      self._write_header()
+    else:
+      # open writer for reading to collect metadata
+      dfr = DataFileReader(writer, io.DatumReader())
+      
+      # TODO(hammer): collect arbitrary metadata
+      # collect metadata
+      self._sync_marker = dfr.sync_marker
+      self.set_meta('codec', dfr.get_meta('codec'))
+
+      # get schema used to write existing file
+      schema_from_file = dfr.get_meta('schema')
+      self.set_meta('schema', schema_from_file)
+      self.datum_writer.writers_schema = schema.parse(schema_from_file)
+
+      # seek to the end of the file and prepare for writing
+      writer.seek(0, 2)
+
+  # read-only properties
+  writer = property(lambda self: self._writer)
+  encoder = property(lambda self: self._encoder)
+  datum_writer = property(lambda self: self._datum_writer)
+  buffer_writer = property(lambda self: self._buffer_writer)
+  buffer_encoder = property(lambda self: self._buffer_encoder)
+  sync_marker = property(lambda self: self._sync_marker)
+  meta = property(lambda self: self._meta)
+
+  # read/write properties
+  def set_block_count(self, new_val):
+    self._block_count = new_val
+  block_count = property(lambda self: self._block_count, set_block_count)
+
+  # utility functions to read/write metadata entries
+  def get_meta(self, key):
+    return self._meta.get(key)
+  def set_meta(self, key, val):
+    self._meta[key] = val
+
+  def _write_header(self):
+    header = {'magic': MAGIC,
+              'meta': self.meta,
+              'sync': self.sync_marker}
+    self.datum_writer.write_data(META_SCHEMA, header, self.encoder)
+
+  # TODO(hammer): make a schema for blocks and use datum_writer
+  # TODO(hammer): use codec when writing the block contents
+  def _write_block(self):
+    if self.block_count > 0:
+      # write number of items in block
+      self.encoder.write_long(self.block_count)
+
+      # write block contents
+      if self.get_meta('codec') == 'null':
+        self.writer.write(self.buffer_writer.getvalue())
+      else:
+        fail_msg = '"%s" codec is not supported.' % self.get_meta('codec')
+        raise DataFileException(fail_msg)
+
+      # write sync marker
+      self.writer.write(self.sync_marker)
+
+      # reset buffer
+      self.buffer_writer.truncate(0) 
+      self.block_count = 0
 
   def append(self, datum):
     """Append a datum to the file."""
-    self.__dwriter.write(datum, self.__bufwriter)
-    self.__count+=1
-    self.__blockcount+=1
-    if self.__buffer.tell() >= _SYNC_INTERVAL:
-      self.__writeblock()
-
-  def __writeblock(self):
-    if self.__blockcount > 0:
-      self.__writer.write(self.__sync)
-      self.__encoder.writelong(self.__blockcount)
-      self.__writer.write(self.__buffer.getvalue())
-      self.__buffer.truncate(0) #reset
-      self.__blockcount = 0
+    self.datum_writer.write(datum, self.buffer_encoder)
+    self.block_count += 1
+
+    # if the data to write is larger than the sync interval, write the block
+    if self.buffer_writer.tell() >= SYNC_INTERVAL:
+      self._write_block()
 
   def sync(self):
-    """Return the current position as a value that may be passed to
+    """
+    Return the current position as a value that may be passed to
     DataFileReader.seek(long). Forces the end of the current block,
-    emitting a synchronization marker."""
-    self.__writeblock()
-    return self.__writer.tell()
+    emitting a synchronization marker.
+    """
+    self._write_block()
+    return self.writer.tell()
 
   def flush(self):
     """Flush the current state of the file, including metadata."""
-    self.__writefooter()
-    self.__writer.flush()
+    self._write_block()
+    self.writer.flush()
 
   def close(self):
     """Close the file."""
     self.flush()
-    self.__writer.close()
-
-  def __writefooter(self):
-    self.__writeblock()
-    self.__meta["count"] = self.__count.__str__()
-    
-    self.__bufwriter.writelong(len(self.__meta))
-    for k,v in self.__meta.items():
-      self.__bufwriter.writeutf8(unicode(k,'utf-8'))
-      self.__bufwriter.writebytes(str(v))
-    size = self.__buffer.tell() + 4
-    self.__writer.write(self.__sync)
-    self.__encoder.writelong(_FOOTER_BLOCK)
-    self.__encoder.writelong(size)
-    self.__buffer.flush()
-    self.__writer.write(self.__buffer.getvalue())
-    self.__buffer.truncate(0) #reset
-    self.__writer.write(chr((size >> 24) & 0xFF))
-    self.__writer.write(chr((size >> 16) & 0xFF))
-    self.__writer.write(chr((size >> 8) & 0xFF))
-    self.__writer.write(chr((size >> 0) & 0xFF))
+    self.writer.close()
 
 class DataFileReader(object):
   """Read files written by DataFileWriter."""
+  # TODO(hammer): allow user to specify expected schema?
+  # TODO(hammer): allow user to specify the encoder
+  def __init__(self, reader, datum_reader):
+    self._reader = reader
+    self._decoder = io.BinaryDecoder(reader)
+    self._datum_reader = datum_reader
+    
+    # read the header: magic, meta, sync
+    self._read_header()
 
-  def __init__(self, reader, dreader):
-    self.__reader = reader
-    self.__decoder = io.Decoder(reader)
-    mag = struct.unpack(len(_MAGIC).__str__()+'s', 
-                 self.__reader.read(len(_MAGIC)))[0]
-    if mag != _MAGIC:
-      raise schema.AvroException("Not an avro data file")
-    #find the length
-    self.__reader.seek(0,2)
-    self.__length = self.__reader.tell()
-    self.__reader.seek(-4, 2)
-    footersize = (int(ord(self.__reader.read(1)) << 24) +
-            int(ord(self.__reader.read(1)) << 16) +
-            int(ord(self.__reader.read(1)) << 8) +
-            int(ord(self.__reader.read(1))))
-    seekpos = self.__reader.seek(self.__length-footersize)
-    metalength = self.__decoder.readlong()
-    if metalength < 0:
-      metalength = -metalength
-      self.__decoder.readlong() #ignore byteCount if this is a blocking map
-    self.__meta = dict()
-    for i in range(0, metalength):
-      key = self.__decoder.readutf8()
-      self.__meta[key] = self.__decoder.readbytes()
-    self.__sync = self.__meta.get("sync")
-    self.__count = int(self.__meta.get("count"))
-    self.__codec = self.__meta.get("codec")
-    if (self.__codec != None) and (self.__codec != "null"):
-      raise schema.AvroException("Unknown codec: " + self.__codec)
-    self.__schema = schema.parse(self.__meta.get("schema").encode("utf-8"))
-    self.__blockcount = 0
-    self.__dreader = dreader
-    self.__dreader.setschema(self.__schema)
-    self.__reader.seek(len(_MAGIC))
-
+    # ensure codec is valid
+    codec_from_file = self.get_meta('codec')
+    if codec_from_file is not None and codec_from_file not in VALID_CODECS:
+      raise DataFileException('Unknown codec: %s.' % codec_from_file)
+
+    # get file length
+    self._file_length = self.determine_file_length()
+
+    # get ready to read
+    self._block_count = 0
+    self.datum_reader.writers_schema = schema.parse(self.get_meta('schema'))
+  
   def __iter__(self):
     return self
 
-  def getmeta(self, key):
-    """Return the value of a metadata property."""
-    return self.__meta.get(key)
+  # read-only properties
+  reader = property(lambda self: self._reader)
+  decoder = property(lambda self: self._decoder)
+  datum_reader = property(lambda self: self._datum_reader)
+  sync_marker = property(lambda self: self._sync_marker)
+  meta = property(lambda self: self._meta)
+  file_length = property(lambda self: self._file_length)
+
+  # read/write properties
+  def set_block_count(self, new_val):
+    self._block_count = new_val
+  block_count = property(lambda self: self._block_count, set_block_count)
+
+  # utility functions to read/write metadata entries
+  def get_meta(self, key):
+    return self._meta.get(key)
+  def set_meta(self, key, val):
+    self._meta[key] = val
+
+  def determine_file_length(self):
+    """
+    Get file length and leave file cursor where we found it.
+    """
+    remember_pos = self.reader.tell()
+    self.reader.seek(0, 2)
+    file_length = self.reader.tell()
+    self.reader.seek(remember_pos)
+    return file_length
+
+  def is_EOF(self):
+    return self.reader.tell() == self.file_length
+
+  def _read_header(self):
+    # seek to the beginning of the file to get magic block
+    self.reader.seek(0, 0) 
+
+    # read header into a dict
+    header = self.datum_reader.read_data(META_SCHEMA, META_SCHEMA, self.decoder)
+
+    # check magic number
+    if header.get('magic') != MAGIC:
+      fail_msg = "Not an Avro data file: %s doesn't match %s."\
+                 % (header.get('magic'), MAGIC)
+      raise schema.AvroException(fail_msg)
+
+    # set metadata
+    self._meta = header['meta']
+
+    # set sync marker
+    self._sync_marker = header['sync']
+
+  def _read_block_header(self):
+    self.block_count = self.decoder.read_long()
+
+  def _skip_sync(self):
+    """
+    Read the length of the sync marker; if it matches the sync marker,
+    return True. Otherwise, seek back to where we started and return False.
+    """
+    proposed_sync_marker = self.reader.read(SYNC_SIZE)
+    if proposed_sync_marker != self.sync_marker:
+      self.reader.seek(-SYNC_SIZE, 1)
+      return False
+    else:
+      return True
 
+  # TODO(hammer): handle block of length zero
+  # TODO(hammer): clean this up with recursion
   def next(self):
     """Return the next datum in the file."""
-    while self.__blockcount == 0:
-      if self.__reader.tell() == self.__length:
+    if self.block_count == 0:
+      if self.is_EOF():
         raise StopIteration
-      self.__skipsync()
-      self.__blockcount = self.__decoder.readlong()
-      if self.__blockcount == _FOOTER_BLOCK:
-        self.__reader.seek(self.__decoder.readlong()+self.__reader.tell())
-        self.__blockcount = 0
-    self.__blockcount-=1
-    datum = self.__dreader.read(self.__decoder)
-    return datum
+      elif self._skip_sync():
+        if self.is_EOF(): raise StopIteration
+        self._read_block_header()
+      else:
+        self._read_block_header()
 
-  def __skipsync(self):
-    if self.__reader.read(_SYNC_SIZE)!=self.__sync:
-      raise schema.AvroException("Invalid sync!")
-
-  def seek(self, pos):
-    """Move to the specified synchronization point, as returned by 
-    DataFileWriter.sync()."""
-    self.__reader.seek(pos)
-    self.__blockcount = 0
-
-  def sync(self, position):
-    """Move to the next synchronization point after a position."""
-    if self.__reader.tell()+_SYNC_SIZE >= self.__length:
-      self.__reader.seek(self.__length)
-      return
-    self.__reader.seek(position)
-    self.__reader.read(_SYNC_SIZE)
+    datum = self.datum_reader.read(self.decoder) 
+    self.block_count -= 1
+    return datum
 
   def close(self):
     """Close this reader."""
-    self.__reader.close()
+    self.reader.close()

Modified: hadoop/avro/trunk/src/py/avro/io.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/io.py?rev=895732&r1=895731&r2=895732&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/io.py (original)
+++ hadoop/avro/trunk/src/py/avro/io.py Mon Jan  4 18:09:42 2010
@@ -13,187 +13,821 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+"""
+Input/Output utilities, including:
 
-"""Input/Output utilities."""
-
+ * i/o-specific constants
+ * i/o-specific exceptions
+ * schema validation
+ * leaf value encoding and decoding
+ * datum reader/writer stuff (?)
+
+Also includes a generic representation for data, which
+uses the following mapping:
+
+  * Schema records are implemented as dict.
+  * Schema arrays are implemented as list.
+  * Schema maps are implemented as dict.
+  * Schema strings are implemented as unicode.
+  * Schema bytes are implemented as str.
+  * Schema ints are implemented as int.
+  * Schema longs are implemented as long.
+  * Schema floats are implemented as float.
+  * Schema doubles are implemented as float.
+  * Schema booleans are implemented as bool. 
+"""
 import struct
-import avro.schema as schema
+from avro import schema
 
-#Constants
-_INT_MIN_VALUE = -(1 << 31)
-_INT_MAX_VALUE = (1 << 31) - 1
-_LONG_MIN_VALUE = -(1 << 63)
-_LONG_MAX_VALUE = (1 << 63) - 1
-_STRUCT_INT = struct.Struct('!I')
-_STRUCT_LONG = struct.Struct('!Q')
-_STRUCT_FLOAT = struct.Struct('!f')
-_STRUCT_DOUBLE = struct.Struct('!d')
+#
+# Constants
+#
+
+INT_MIN_VALUE = -(1 << 31)
+INT_MAX_VALUE = (1 << 31) - 1
+LONG_MIN_VALUE = -(1 << 63)
+LONG_MAX_VALUE = (1 << 63) - 1
+
+# TODO(hammer): shouldn't ! be < for little-endian (according to spec?)
+STRUCT_INT = struct.Struct('!I')     # big-endian unsigned int
+STRUCT_LONG = struct.Struct('!Q')    # big-endian unsigned long long
+STRUCT_FLOAT = struct.Struct('!f')   # big-endian float
+STRUCT_DOUBLE = struct.Struct('!d')  # big-endian double
+
+#
+# Exceptions
+#
 
 class AvroTypeException(schema.AvroException):
-  """Raised when illegal type is used."""
-  def __init__(self, schm=None, datum=None, msg=None):
-    if msg is None:
-      msg = "Not a "+schema.stringval(schm)+": "+datum.__str__()
-    schema.AvroException.__init__(self, msg)
-
-class DatumReaderBase(object):
-  """Base class for reading data of a schema."""
-
-  def setschema(self, schema):
-    pass
+  """Raised when datum is not an example of schema."""
+  def __init__(self, expected_schema, datum):
+    fail_msg = "The datum %s is not an example of the schema %s"\
+               % (datum, expected_schema)
+    schema.AvroException.__init__(self, fail_msg)
+
+class SchemaMatchException(schema.AvroException):
+  """Raised when writer's and reader's schema do not match."""
+  def __init__(self, writers_schema, readers_schema):
+    fail_msg = "Writer's schema %s and Reader's schema %s do not match."\
+               % (writers_schema, readers_schema)
+    schema.AvroException.__init__(self, fail_msg)
+
+#
+# Validate
+#
+
+def validate(expected_schema, datum):
+  """Determine if a python datum is an instance of a schema."""
+  schema_type = expected_schema.type
+  if schema_type == 'null':
+    return datum is None
+  elif schema_type == 'boolean':
+    return isinstance(datum, bool)
+  elif schema_type == 'string':
+    return isinstance(datum, basestring)
+  elif schema_type == 'bytes':
+    return isinstance(datum, str)
+  elif schema_type == 'int':
+    return ((isinstance(datum, int) or isinstance(datum, long)) 
+            and INT_MIN_VALUE <= datum <= INT_MAX_VALUE)
+  elif schema_type == 'long':
+    return ((isinstance(datum, int) or isinstance(datum, long)) 
+            and LONG_MIN_VALUE <= datum <= LONG_MAX_VALUE)
+  elif schema_type in ['float', 'double']:
+    return isinstance(datum, float)
+  elif schema_type == 'fixed':
+    return isinstance(datum, str) and len(datum) == expected_schema.size
+  elif schema_type == 'enum':
+    return datum in expected_schema.symbols
+  elif schema_type == 'array':
+    return (isinstance(datum, list) and
+      False not in [validate(expected_schema.items, d) for d in datum])
+  elif schema_type == 'map':
+    return (isinstance(datum, dict) and
+      False not in [isinstance(k, basestring) for k in datum.keys()] and
+      False not in
+        [validate(expected_schema.values, v) for v in datum.values()])
+  elif schema_type == 'union':
+    return True in [validate(s, datum) for s in expected_schema.schemas]
+  elif schema_type == 'record':
+    return (isinstance(datum, dict) and
+      False not in
+        [validate(f.type, datum.get(f.name)) for f in expected_schema.fields])
+
+#
+# Decoder/Encoder
+#
 
-  def read(self, decoder):
-    """Read a datum. Traverse the schema, depth-first, reading all leaf values
-    in the schema into a datum that is returned"""
-    pass
-
-class DatumWriterBase(object):
-  """Base class for writing data of a schema."""
-  def setschema(self, schema):
-    pass
-
-  def write(self, data, encoder):
-    """Write a datum. Traverse the schema, depth first, writing each leaf value
-    in the schema from the datum to the output."""
-    pass
-
-
-class Decoder(object):
+class BinaryDecoder(object):
   """Read leaf values."""
-
   def __init__(self, reader):
-    self.__reader = reader
-
-  def readboolean(self):
-    return ord(self.__reader.read(1)) == 1
-
-  def readint(self):
-    return self.readlong()
-
-  def readlong(self):
-    b = ord(self.__reader.read(1))
+    """
+    reader is a Python object on which we can call read, seek, and tell.
+    """
+    self._reader = reader
+
+  # read-only properties
+  reader = property(lambda self: self._reader)
+
+  def read_null(self):
+    """
+    null is written as zero bytes
+    """
+    return None
+
+  def read_boolean(self):
+    """
+    a boolean is written as a single byte 
+    whose value is either 0 (false) or 1 (true).
+    """
+    return ord(self.reader.read(1)) == 1
+
+  def read_int(self):
+    """
+    int and long values are written using variable-length, zig-zag coding.
+    """
+    return self.read_long()
+
+  def read_long(self):
+    """
+    int and long values are written using variable-length, zig-zag coding.
+    """
+    b = ord(self.reader.read(1))
     n = b & 0x7F
     shift = 7
     while (b & 0x80) != 0:
-      b = ord(self.__reader.read(1))
+      b = ord(self.reader.read(1))
       n |= (b & 0x7F) << shift
       shift += 7
     datum = (n >> 1) ^ -(n & 1)
     return datum
 
-  def readfloat(self):
-    bits = (((ord(self.__reader.read(1)) & 0xffL)    ) |
-        ((ord(self.__reader.read(1)) & 0xffL) <<  8) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 16) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 24))
-    return _STRUCT_FLOAT.unpack(_STRUCT_INT.pack(bits))[0]
-
-  def readdouble(self):
-    bits = (((ord(self.__reader.read(1)) & 0xffL)    ) |
-        ((ord(self.__reader.read(1)) & 0xffL) <<  8) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 16) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 24) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 32) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 40) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 48) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 56))
-    return _STRUCT_DOUBLE.unpack(_STRUCT_LONG.pack(bits))[0]
-
-  def readbytes(self):
-    return self.read(self.readlong())
-
-  def readutf8(self):
-    return unicode(self.readbytes(), "utf-8")
+  def read_float(self):
+    """
+    A float is written as 4 bytes.
+    The float is converted into a 32-bit integer using a method equivalent to
+    Java's floatToIntBits and then encoded in little-endian format.
+    """
+    bits = (((ord(self.reader.read(1)) & 0xffL)) |
+      ((ord(self.reader.read(1)) & 0xffL) <<  8) |
+      ((ord(self.reader.read(1)) & 0xffL) << 16) |
+      ((ord(self.reader.read(1)) & 0xffL) << 24))
+    return STRUCT_FLOAT.unpack(STRUCT_INT.pack(bits))[0]
+
+  def read_double(self):
+    """
+    A double is written as 8 bytes.
+    The double is converted into a 64-bit integer using a method equivalent to
+    Java's doubleToLongBits and then encoded in little-endian format.
+    """
+    bits = (((ord(self.reader.read(1)) & 0xffL)) |
+      ((ord(self.reader.read(1)) & 0xffL) <<  8) |
+      ((ord(self.reader.read(1)) & 0xffL) << 16) |
+      ((ord(self.reader.read(1)) & 0xffL) << 24) |
+      ((ord(self.reader.read(1)) & 0xffL) << 32) |
+      ((ord(self.reader.read(1)) & 0xffL) << 40) |
+      ((ord(self.reader.read(1)) & 0xffL) << 48) |
+      ((ord(self.reader.read(1)) & 0xffL) << 56))
+    return STRUCT_DOUBLE.unpack(STRUCT_LONG.pack(bits))[0]
+
+  def read_bytes(self):
+    """
+    Bytes are encoded as a long followed by that many bytes of data. 
+    """
+    return self.read(self.read_long())
+
+  def read_utf8(self):
+    """
+    A string is encoded as a long followed by
+    that many bytes of UTF-8 encoded character data.
+    """
+    return unicode(self.read_bytes(), "utf-8")
+
+  def read(self, n):
+    """
+    Read n bytes.
+    """
+    return struct.unpack('%ds' % n, self.reader.read(n))[0]
 
-  def read(self, len):
-    return struct.unpack(len.__str__()+'s', self.__reader.read(len))[0]
+  def skip_null(self):
+    pass
 
-  def skipboolean(self):
+  def skip_boolean(self):
     self.skip(1)
 
-  def skipint(self):
+  # TODO(hammer): I thought ints were VLE?
+  def skip_int(self):
     self.skip(4)
 
-  def skiplong(self):
+  # TODO(hammer): I thought longs were VLE?
+  def skip_long(self):
     self.skip(8)
 
-  def skipfloat(self):
+  def skip_float(self):
     self.skip(4)
 
-  def skipdouble(self):
+  def skip_double(self):
     self.skip(8)
 
-  def skipbytes(self):
-    self.skip(self.readlong())
+  def skip_bytes(self):
+    self.skip(self.read_long())
 
-  def skiputf8(self):
-    self.skipbytes()
+  def skip_utf8(self):
+    self.skip_bytes()
 
-  def skip(self, len):
-    self.__reader.seek(self.__reader.tell()+len)
+  def skip(self, n):
+    self.reader.seek(self.reader.tell() + n)
 
-class Encoder(object):
+class BinaryEncoder(object):
   """Write leaf values."""
-
   def __init__(self, writer):
-    self.__writer = writer
+    """
+    writer is a Python object on which we can call write.
+    """
+    self._writer = writer
+
+  # read-only properties
+  writer = property(lambda self: self._writer)
+
+  def write_null(self, datum):
+    """
+    null is written as zero bytes
+    """
+    pass
   
-  def writeboolean(self, datum):
-    if not isinstance(datum, bool):
-      raise AvroTypeException(schema.BOOLEAN, datum)
+  def write_boolean(self, datum):
+    """
+    a boolean is written as a single byte 
+    whose value is either 0 (false) or 1 (true).
+    """
     if datum:
-      self.__writer.write(chr(1))
+      self.writer.write(chr(1))
     else:
-      self.__writer.write(chr(0))
+      self.writer.write(chr(0))
 
-  def writeint(self, n):
-    if n < _INT_MIN_VALUE or n > _INT_MAX_VALUE:
-      raise AvroTypeException(schema.INT, n, 
-                              "datam too big to fit into avro int")
-    self.writelong(n);
-
-  def writelong(self, n):
-    if n < _LONG_MIN_VALUE or n > _LONG_MAX_VALUE:
-      raise AvroTypeException(schema.LONG, n, 
-                              "datam too big to fit into avro long")
-    n = (n << 1) ^ (n >> 63)
-    while (n & ~0x7F) != 0:
-      self.__writer.write(chr((n & 0x7f) | 0x80))
-      n >>=7
-    self.__writer.write(chr(n))
-
-  def writefloat(self, datum):
-    bits = _STRUCT_INT.unpack(_STRUCT_FLOAT.pack(datum))[0]
-    self.__writer.write(chr((bits    ) & 0xFF))
-    self.__writer.write(chr((bits >>  8) & 0xFF))
-    self.__writer.write(chr((bits >>  16) & 0xFF))
-    self.__writer.write(chr((bits >>  24) & 0xFF))
-
-  def writedouble(self, datum):
-    bits = _STRUCT_LONG.unpack(_STRUCT_DOUBLE.pack(datum))[0]
-    self.__writer.write(chr((bits    ) & 0xFF))
-    self.__writer.write(chr((bits >>  8) & 0xFF))
-    self.__writer.write(chr((bits >>  16) & 0xFF))
-    self.__writer.write(chr((bits >>  24) & 0xFF))
-    self.__writer.write(chr((bits >>  32) & 0xFF))
-    self.__writer.write(chr((bits >>  40) & 0xFF))
-    self.__writer.write(chr((bits >>  48) & 0xFF))
-    self.__writer.write(chr((bits >>  56) & 0xFF))
-
-  def writebytes(self, datum):
-    if not isinstance(datum, str):
-      raise AvroTypeException(schema.BYTES, datum, 
-                              "avro BYTES should be python str")
-    self.writelong(len(datum))
-    self.__writer.write(struct.pack(len(datum).__str__()+'s',datum))
-
-  def writeutf8(self, datum):
-    if not isinstance(datum, basestring):
-      raise AvroTypeException(schema.STRING, datum, 
-                              "avro STRING should be python unicode")
+  def write_int(self, datum):
+    """
+    int and long values are written using variable-length, zig-zag coding.    
+    """
+    self.write_long(datum);
+
+  def write_long(self, datum):
+    """
+    int and long values are written using variable-length, zig-zag coding.
+    """
+    datum = (datum << 1) ^ (datum >> 63)
+    while (datum & ~0x7F) != 0:
+      self.writer.write(chr((datum & 0x7f) | 0x80))
+      datum >>= 7
+    self.writer.write(chr(datum))
+
+  def write_float(self, datum):
+    """
+    A float is written as 4 bytes.
+    The float is converted into a 32-bit integer using a method equivalent to
+    Java's floatToIntBits and then encoded in little-endian format.
+    """
+    bits = STRUCT_INT.unpack(STRUCT_FLOAT.pack(datum))[0]
+    self.writer.write(chr((bits) & 0xFF))
+    self.writer.write(chr((bits >> 8) & 0xFF))
+    self.writer.write(chr((bits >> 16) & 0xFF))
+    self.writer.write(chr((bits >> 24) & 0xFF))
+
+  def write_double(self, datum):
+    """
+    A double is written as 8 bytes.
+    The double is converted into a 64-bit integer using a method equivalent to
+    Java's doubleToLongBits and then encoded in little-endian format.
+    """
+    bits = STRUCT_LONG.unpack(STRUCT_DOUBLE.pack(datum))[0]
+    self.writer.write(chr((bits) & 0xFF))
+    self.writer.write(chr((bits >> 8) & 0xFF))
+    self.writer.write(chr((bits >> 16) & 0xFF))
+    self.writer.write(chr((bits >> 24) & 0xFF))
+    self.writer.write(chr((bits >> 32) & 0xFF))
+    self.writer.write(chr((bits >> 40) & 0xFF))
+    self.writer.write(chr((bits >> 48) & 0xFF))
+    self.writer.write(chr((bits >> 56) & 0xFF))
+
+  def write_bytes(self, datum):
+    """
+    Bytes are encoded as a long followed by that many bytes of data. 
+    """
+    self.write_long(len(datum))
+    self.writer.write(struct.pack('%ds' % len(datum), datum))
+
+  def write_utf8(self, datum):
+    """
+    A string is encoded as a long followed by
+    that many bytes of UTF-8 encoded character data.
+    """
     datum = datum.encode("utf-8")
-    self.writebytes(datum)
+    self.write_bytes(datum)
 
   def write(self, datum):
-    self.__writer.write(datum)
+    """Write an abritrary datum."""
+    self.writer.write(datum)
+
+#
+# DatumReader/Writer
+#
+
+class DatumReader(object):
+  """Deserialize Avro-encoded data into a Python data structure."""
+  @staticmethod
+  def check_props(schema_one, schema_two, prop_list):
+    for prop in prop_list:
+      if getattr(schema_one, prop) != getattr(schema_two, prop):
+        return False
+    return True
+
+  @staticmethod
+  def match_schemas(writers_schema, readers_schema):
+    w_type = writers_schema.type
+    r_type = readers_schema.type
+    if 'union' in [w_type, r_type]:
+      return True
+    elif (w_type in schema.PRIMITIVE_TYPES and r_type in schema.PRIMITIVE_TYPES
+          and w_type == r_type):
+      return True
+    elif (w_type == r_type == 'record' and
+          DatumReader.check_props(writers_schema, readers_schema, 
+                                  ['fullname'])):
+      return True
+    elif (w_type == r_type == 'fixed' and 
+          DatumReader.check_props(writers_schema, readers_schema, 
+                                  ['fullname', 'size'])):
+      return True
+    elif (w_type == r_type == 'enum' and 
+          DatumReader.check_props(writers_schema, readers_schema, 
+                                  ['fullname'])):
+      return True
+    elif (w_type == r_type == 'map' and 
+          DatumReader.check_props(writers_schema.values,
+                                  readers_schema.values, ['type'])):
+      return True
+    elif (w_type == r_type == 'array' and 
+          DatumReader.check_props(writers_schema.items,
+                                  readers_schema.items, ['type'])):
+      return True
+    
+    # Handle schema promotion
+    if w_type == 'int' and r_type in ['long', 'float', 'double']:
+      return True
+    elif w_type == 'long' and r_type in ['float', 'double']:
+      return True
+    elif w_type == 'float' and r_type == 'double':
+      return True
+    return False
+
+  def __init__(self, writers_schema=None, readers_schema=None):
+    """
+    As defined in the Avro specification, we call the schema encoded
+    in the data the "writer's schema", and the schema expected by the
+    reader the "reader's schema".
+    """
+    self._writers_schema = writers_schema
+    self._readers_schema = readers_schema 
+
+  # read/write properties
+  def set_writers_schema(self, writers_schema):
+    self._writers_schema = writers_schema
+  writers_schema = property(lambda self: self._writers_schema,
+                            set_writers_schema)
+  def set_readers_schema(self, readers_schema):
+    self._readers_schema = readers_schema
+  readers_schema = property(lambda self: self._readers_schema,
+                            set_readers_schema)
+  
+  def read(self, decoder):
+    if self.readers_schema is None:
+      self.readers_schema = self.writers_schema
+    return self.read_data(self.writers_schema, self.readers_schema, decoder)
+
+  def read_data(self, writers_schema, readers_schema, decoder):
+    # schema matching
+    if not DatumReader.match_schemas(writers_schema, readers_schema):
+      raise SchemaMatchException(writers_schema, readers_schema)
+
+    # schema resolution: reader's schema is a union, writer's schema is not
+    if writers_schema.type != 'union' and readers_schema.type == 'union':
+      for s in readers_schema.schemas:
+        if DatumReader.match_schemas(writers_schema, s):
+          return self.read_data(writers_schema, s, decoder)
+      raise SchemaMatchException(writers_schema, readers_schema)
+
+    # function dispatch for reading data based on type of writer's schema
+    if writers_schema.type == 'null':
+      return decoder.read_null()
+    elif writers_schema.type == 'boolean':
+      return decoder.read_boolean()
+    elif writers_schema.type == 'string':
+      return decoder.read_utf8()
+    elif writers_schema.type == 'int':
+      return decoder.read_int()
+    elif writers_schema.type == 'long':
+      return decoder.read_long()
+    elif writers_schema.type == 'float':
+      return decoder.read_float()
+    elif writers_schema.type == 'double':
+      return decoder.read_double()
+    elif writers_schema.type == 'bytes':
+      return decoder.read_bytes()
+    elif writers_schema.type == 'fixed':
+      return self.read_fixed(writers_schema, readers_schema, decoder)
+    elif writers_schema.type == 'enum':
+      return self.read_enum(writers_schema, readers_schema, decoder)
+    elif writers_schema.type == 'array':
+      return self.read_array(writers_schema, readers_schema, decoder)
+    elif writers_schema.type == 'map':
+      return self.read_map(writers_schema, readers_schema, decoder)
+    elif writers_schema.type == 'union':
+      return self.read_union(writers_schema, readers_schema, decoder)
+    elif writers_schema.type == 'record':
+      return self.read_record(writers_schema, readers_schema, decoder)
+    else:
+      fail_msg = "Cannot read unknown schema type: %s" % writers_schema.type
+      raise schema.AvroException(fail_msg)
+
+  def skip_data(self, writers_schema, decoder):
+    if writers_schema.type == 'null':
+      return decoder.skip_null()
+    elif writers_schema.type == 'boolean':
+      return decoder.skip_boolean()
+    elif writers_schema.type == 'string':
+      return decoder.skip_utf8()
+    elif writers_schema.type == 'int':
+      return decoder.skip_int()
+    elif writers_schema.type == 'long':
+      return decoder.skip_long()
+    elif writers_schema.type == 'float':
+      return decoder.skip_float()
+    elif writers_schema.type == 'double':
+      return decoder.skip_double()
+    elif writers_schema.type == 'bytes':
+      return decoder.skip_bytes()
+    elif writers_schema.type == 'fixed':
+      return self.skip_fixed(writers_schema, decoder)
+    elif writers_schema.type == 'enum':
+      return self.skip_enum(writers_schema, decoder)
+    elif writers_schema.type == 'array':
+      return self.skip_array(writers_schema, decoder)
+    elif writers_schema.type == 'map':
+      return self.skip_map(writers_schema, decoder)
+    elif writers_schema.type == 'union':
+      return self.skip_union(writers_schema, decoder)
+    elif writers_schema.type == 'record':
+      return self.skip_record(writers_schema, decoder)
+    else:
+      fail_msg = "Unknown schema type: %s" % schm.type
+      raise schema.AvroException(fail_msg)
+
+  def read_fixed(self, writers_schema, readers_schema, decoder):
+    """
+    Fixed instances are encoded using the number of bytes declared
+    in the schema.
+    """
+    return decoder.read(writers_schema.size)
+
+  def skip_fixed(self, writers_schema, decoder):
+    return decoder.skip(writers_schema.size)
+
+  def read_enum(self, writers_schema, readers_schema, decoder):
+    """
+    An enum is encoded by a int, representing the zero-based position
+    of the symbol in the schema.
+    """
+    # read data
+    index_of_symbol = decoder.read_int()
+    read_symbol = writers_schema.symbols[index_of_symbol]
+
+    # TODO(hammer): figure out what "unset" means for resolution
+    # schema resolution
+    if read_symbol not in readers_schema.symbols:
+      pass # 'unset' here
+
+    return read_symbol
+
+  def skip_enum(self, writers_schema, decoder):
+    return decoder.skip_int()
+
+  def read_array(self, writers_schema, readers_schema, decoder):
+    """
+    Arrays are encoded as a series of blocks.
+
+    Each block consists of a long count value,
+    followed by that many array items.
+    A block with count zero indicates the end of the array.
+    Each item is encoded per the array's item schema.
+
+    If a block's count is negative,
+    then the count is followed immediately by a long block size,
+    indicating the number of bytes in the block.
+    The actual count in this case
+    is the absolute value of the count written.
+    """
+    read_items = []
+    block_count = decoder.read_long()
+    while block_count != 0:
+      if block_count < 0:
+        block_count = -block_count
+        block_size = decoder.read_long()
+      for i in range(block_count):
+        read_items.append(self.read_data(writers_schema.items,
+                                         readers_schema.items, decoder))
+      block_count = decoder.read_long()
+    return read_items
+
+  def skip_array(self, writers_schema, decoder):
+    block_count = decoder.read_long()
+    while block_count != 0:
+      if block_count < 0:
+        block_size = decoder.read_long()
+        decoder.skip(block_size)
+      else:
+        for i in range(block_count):
+          self.skip_data(writers_schema.items, decoder)
+      block_count = decoder.read_long()
+
+  def read_map(self, writers_schema, readers_schema, decoder):
+    """
+    Maps are encoded as a series of blocks.
+
+    Each block consists of a long count value,
+    followed by that many key/value pairs.
+    A block with count zero indicates the end of the map.
+    Each item is encoded per the map's value schema.
+
+    If a block's count is negative,
+    then the count is followed immediately by a long block size,
+    indicating the number of bytes in the block.
+    The actual count in this case
+    is the absolute value of the count written.
+    """
+    read_items = {}
+    block_count = decoder.read_long()
+    while block_count != 0:
+      if block_count < 0:
+        block_count = -block_count
+        block_size = decoder.read_long()
+      for i in range(block_count):
+        key = decoder.read_utf8()
+        read_items[key] = self.read_data(writers_schema.values,
+                                         readers_schema.values, decoder)
+      block_count = decoder.read_long()
+    return read_items
+
+  def skip_map(self, writers_schema, decoder):
+    block_count = decoder.read_long()
+    while block_count != 0:
+      if block_count < 0:
+        block_size = decoder.read_long()
+        decoder.skip(block_size)
+      else:
+        for i in range(block_count):
+          decoder.skip_utf8()
+          self.skip_data(writers_schema.values, decoder)
+      block_count = decoder.read_long()
+
+  def read_union(self, writers_schema, readers_schema, decoder):
+    """
+    A union is encoded by first writing a long value indicating
+    the zero-based position within the union of the schema of its value.
+    The value is then encoded per the indicated schema within the union.
+    """
+    # schema resolution
+    index_of_schema = int(decoder.read_long())
+    selected_writers_schema = writers_schema.schemas[index_of_schema]
+    
+    # read data
+    return self.read_data(selected_writers_schema, readers_schema, decoder)
+
+  def skip_union(self, writers_schema, decoder):
+    index_of_schema = int(decoder.read_long())
+    return self.skip_data(writers_schema.schemas[index_of_schema], decoder)
+
+  def read_record(self, writers_schema, readers_schema, decoder):
+    """
+    A record is encoded by encoding the values of its fields
+    in the order that they are declared. In other words, a record
+    is encoded as just the concatenation of the encodings of its fields.
+    Field values are encoded per their schema.
+
+    Schema Resolution:
+     * the ordering of fields may be different: fields are matched by name.
+     * schemas for fields with the same name in both records are resolved
+       recursively.
+     * if the writer's record contains a field with a name not present in the
+       reader's record, the writer's value for that field is ignored.
+     * if the reader's record schema has a field that contains a default value,
+       and writer's schema does not have a field with the same name, then the
+       reader should use the default value from its field.
+     * if the reader's record schema has a field with no default value, and 
+       writer's schema does not have a field with the same name, then the
+       field's value is unset.
+    """
+    # schema resolution
+    readers_fields_dict = readers_schema.fields_dict
+    read_record = {}
+    for field in writers_schema.fields:
+      readers_field = readers_fields_dict.get(field.name)
+      if readers_field is not None:
+        field_val = self.read_data(field.type, readers_field.type, decoder)
+        read_record[field.name] = field_val
+      else:
+        self.skip_data(field.type, decoder)
+
+    # fill in default values
+    if len(readers_fields_dict) > len(read_record):
+      writers_fields_dict = writers_schema.fields_dict
+      for field_name, field in readers_fields_dict.items():
+        if not writers_fields_dict.has_key(field_name):
+          if field.default is not None:
+            field_val = self._read_default_value(field.type, field.default)
+            read_record[field.name] = field_val
+          else:
+            pass # 'unset' here
+    return read_record
+
+  def skip_record(self, writers_schema, decoder):
+    for field in writers_schema.fields:
+      self.skip_data(field.type, decoder)
+
+  def _read_default_value(self, field_schema, default_value):
+    """
+    Basically a JSON Decoder?
+    """
+    if field_schema.type in 'null':
+      return None
+    elif field_schema.type in 'boolean':
+      return bool(default_value)
+    elif field_schema.type == 'int':
+      return int(default_value)
+    elif field_schema.type == 'long':
+      return long(default_value)
+    elif field_schema.type in ['float', 'double']:
+      return float(default_value)
+    elif field_schema.type in ['enum', 'fixed', 'string', 'bytes']:
+      return default_value
+    elif field_schema.type == 'array':
+      read_array = []
+      for json_val in default_value:
+        item_val = self._read_default_value(field_schema.items, json_val)
+        read_array.append(item_val)
+      return read_array
+    elif field_schema.type == 'map':
+      read_map = {}
+      for key, json_val in default_value.items():
+        map_val = self._read_default_value(field_schema.values, json_val)
+        read_map[key] = map_val
+      return read_map
+    elif field_schema.type == 'union':
+      return self._read_default_value(field_schema.schemas[0], default_value)
+    elif field_schema.type == 'record':
+      read_record = {}
+      for field in field_schema.fields:
+        json_val = default_value.get(field.name)
+        if json_val is None: json_val = field.default
+        field_val = self._read_default_value(field.type, json_val)
+        read_record[field.name] = field_val
+      return read_record
+    else:
+      fail_msg = 'Unknown type: %s' % field_schema.type
+      raise schema.AvroException(fail_msg)
+
+class DatumWriter(object):
+  """DatumWriter for generic python objects."""
+  def __init__(self, writers_schema=None):
+    self._writers_schema = writers_schema
+
+  # read/write properties
+  def set_writers_schema(self, writers_schema):
+    self._writers_schema = writers_schema
+  writers_schema = property(lambda self: self._writers_schema,
+                            set_writers_schema)
+
+  def write(self, datum, encoder):
+    self.write_data(self.writers_schema, datum, encoder)
+
+  def write_data(self, writers_schema, datum, encoder):
+    # validate datum
+    if not validate(writers_schema, datum):
+      raise AvroTypeException(writers_schema, datum)
+    
+    # function dispatch to write datum
+    if writers_schema.type == 'null':
+      encoder.write_null(datum)
+    elif writers_schema.type == 'boolean':
+      encoder.write_boolean(datum)
+    elif writers_schema.type == 'string':
+      encoder.write_utf8(datum)
+    elif writers_schema.type == 'int':
+      encoder.write_int(datum)
+    elif writers_schema.type == 'long':
+      encoder.write_long(datum)
+    elif writers_schema.type == 'float':
+      encoder.write_float(datum)
+    elif writers_schema.type == 'double':
+      encoder.write_double(datum)
+    elif writers_schema.type == 'bytes':
+      encoder.write_bytes(datum)
+    elif writers_schema.type == 'fixed':
+      self.write_fixed(writers_schema, datum, encoder)
+    elif writers_schema.type == 'enum':
+      self.write_enum(writers_schema, datum, encoder)
+    elif writers_schema.type == 'array':
+      self.write_array(writers_schema, datum, encoder)
+    elif writers_schema.type == 'map':
+      self.write_map(writers_schema, datum, encoder)
+    elif writers_schema.type == 'union':
+      self.write_union(writers_schema, datum, encoder)
+    elif writers_schema.type == 'record':
+      self.write_record(writers_schema, datum, encoder)
+    else:
+      fail_msg = 'Unknown type: %s' % writers_schema.type
+      raise io.AvroException(fail_msg)
 
+  def write_fixed(self, writers_schema, datum, encoder):
+    """
+    Fixed instances are encoded using the number of bytes declared
+    in the schema.
+    """
+    encoder.write(datum)
+
+  def write_enum(self, writers_schema, datum, encoder):
+    """
+    An enum is encoded by a int, representing the zero-based position
+    of the symbol in the schema.
+    """
+    index_of_datum = writers_schema.symbols.index(datum)
+    encoder.write_int(index_of_datum)
+
+  def write_array(self, writers_schema, datum, encoder):
+    """
+    Arrays are encoded as a series of blocks.
+
+    Each block consists of a long count value,
+    followed by that many array items.
+    A block with count zero indicates the end of the array.
+    Each item is encoded per the array's item schema.
+
+    If a block's count is negative,
+    then the count is followed immediately by a long block size,
+    indicating the number of bytes in the block.
+    The actual count in this case
+    is the absolute value of the count written.
+    """
+    if len(datum) > 0:
+      encoder.write_long(len(datum))
+      for item in datum:
+        self.write_data(writers_schema.items, item, encoder)
+    encoder.write_long(0)
+
+  def write_map(self, writers_schema, datum, encoder):
+    """
+    Maps are encoded as a series of blocks.
+
+    Each block consists of a long count value,
+    followed by that many key/value pairs.
+    A block with count zero indicates the end of the map.
+    Each item is encoded per the map's value schema.
+
+    If a block's count is negative,
+    then the count is followed immediately by a long block size,
+    indicating the number of bytes in the block.
+    The actual count in this case
+    is the absolute value of the count written.
+    """
+    if len(datum) > 0:
+      encoder.write_long(len(datum))
+      for key, val in datum.items():
+        encoder.write_utf8(key)
+        self.write_data(writers_schema.values, val, encoder)
+    encoder.write_long(0)
+
+  def write_union(self, writers_schema, datum, encoder):
+    """
+    A union is encoded by first writing a long value indicating
+    the zero-based position within the union of the schema of its value.
+    The value is then encoded per the indicated schema within the union.
+    """
+    # resolve union
+    index_of_schema = -1
+    for i, candidate_schema in enumerate(writers_schema.schemas):
+      if validate(candidate_schema, datum):
+        index_of_schema = i
+    if index_of_schema < 0: raise AvroTypeException(writers_schema, datum)
+
+    # write data
+    encoder.write_long(index_of_schema)
+    self.write_data(writers_schema.schemas[index_of_schema], datum, encoder)
+
+  def write_record(self, writers_schema, datum, encoder):
+    """
+    A record is encoded by encoding the values of its fields
+    in the order that they are declared. In other words, a record
+    is encoded as just the concatenation of the encodings of its fields.
+    Field values are encoded per their schema.
+    """
+    for field in writers_schema.fields:
+      self.write_data(field.type, datum.get(field.name), encoder)

Modified: hadoop/avro/trunk/src/py/avro/schema.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/schema.py?rev=895732&r1=895731&r2=895732&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/schema.py (original)
+++ hadoop/avro/trunk/src/py/avro/schema.py Mon Jan  4 18:09:42 2010
@@ -13,12 +13,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 """
 Contains the Schema classes.
 
 A schema may be one of:
-  An record, mapping field names to field value data;
+  A record, mapping field names to field value data;
+  An error, equivalent to a record;
   An enum, containing one of a small set of symbols;
   An array of values, all of the same schema;
   A map containing string/value pairs, each of a declared schema;
@@ -33,499 +33,553 @@
   A boolean; or
   Null.
 """
-
-import cStringIO
-# Use simplejson or Python 2.6 json, prefer simplejson.
 try:
   import simplejson as json
 except ImportError:
   import json
 
-import odict
+#
+# Constants
+#
+
+PRIMITIVE_TYPES = (
+  'null',
+  'boolean',
+  'string',
+  'bytes',
+  'int',
+  'long',
+  'float',
+  'double',
+)
+
+NAMED_TYPES = (
+  'fixed',
+  'enum',
+  'record',
+  'error',
+)
+
+VALID_TYPES = PRIMITIVE_TYPES + NAMED_TYPES + (
+  'array',
+  'map',
+  'union',
+)
+
+RESERVED_PROPS = (
+  'type',
+  'name',
+  'namespace',
+  'fields',     # Record
+  'items',      # Array
+  'size',       # Fixed
+  'symbols',    # Enum
+  'values',     # Map
+)
+
+VALID_FIELD_SORT_ORDERS = (
+  'ascending',
+  'descending',
+  'ignore',
+)
+
+#
+# Exceptions
+#
 
-# The schema types
-STRING, BYTES, INT, LONG, FLOAT, DOUBLE, BOOLEAN, NULL, \
-ARRAY, MAP, UNION, FIXED, RECORD, ENUM = range(14)
+class AvroException(Exception):
+  pass
+
+class SchemaParseException(AvroException):
+  pass
+
+#
+# Base Classes
+#
 
 class Schema(object):
   """Base class for all Schema classes."""
-
   def __init__(self, type):
-    self.__type = type
-
-  def gettype(self):
-    return self.__type
-
-  def __eq__(self, other, seen=None):
-    if self is other:
-      return True
-    return isinstance(other, Schema) and self.__type == other.__type
-
-  def __hash__(self, seen=None):
-    return self.__type.__hash__()
-
-class _StringSchema(Schema):
-  def __init__(self):
-    Schema.__init__(self, STRING)
-
-  def str(self, names):
-    return '"string"'
-
-class _BytesSchema(Schema):
-  def __init__(self):
-    Schema.__init__(self, BYTES)
-
-  def str(self, names):
-    return '"bytes"'
-
-class _IntSchema(Schema):
-  def __init__(self):
-    Schema.__init__(self, INT)
-
-  def str(self, names):
-    return '"int"'
-
-class _LongSchema(Schema):
-  def __init__(self):
-    Schema.__init__(self, LONG)
-
-  def str(self, names):
-    return '"long"'
-
-class _FloatSchema(Schema):
-  def __init__(self):
-    Schema.__init__(self, FLOAT)
+    # Ensure valid ctor args
+    if not isinstance(type, basestring):
+      fail_msg = 'Schema type must be a string.'
+      raise SchemaParseException(fail_msg)
+    elif type not in VALID_TYPES:
+      fail_msg = '%s is not a valid type.' % type
+      raise SchemaParseException(fail_msg)
+
+    # add members
+    if not hasattr(self, '_props'): self._props = {}
+    self.set_prop('type', type)
+
+  # read-only properties
+  props = property(lambda self: self._props)
+  type = property(lambda self: self.get_prop('type'))
+
+  # utility functions to manipulate properties dict
+  def get_prop(self, key):
+    return self.props.get(key)
+  def set_prop(self, key, value):
+    self.props[key] = value
+
+
+class Name(object):
+  """Container class for static methods on Avro names."""
+  @staticmethod
+  def make_fullname(name, namespace):
+    if name.find('.') < 0 and namespace is not None:
+      return '.'.join([namespace, name])
+    else:
+      return name
 
-  def str(self, names):
-    return '"float"'
+  @staticmethod
+  def extract_namespace(name, namespace):
+    parts = name.rsplit('.', 1)
+    if len(parts) > 1:
+      namespace, name = parts
+    return name, namespace
+
+  @staticmethod
+  def add_name(names, new_schema):
+    """Add a new schema object to the names dictionary (in place)."""
+    new_fullname = new_schema.fullname
+    if new_fullname in VALID_TYPES:
+      fail_msg = '%s is a reserved type name.' % new_fullname
+      raise SchemaParseException(fail_msg)
+    elif names is not None and names.has_key(new_fullname):
+      fail_msg = 'The name "%s" is already in use.' % new_fullname
+      raise SchemaParseException(fail_msg)
+    elif names is None:
+      names = {}
 
-class _DoubleSchema(Schema):
-  def __init__(self):
-    Schema.__init__(self, DOUBLE)
+    names[new_fullname] = new_schema
+    return names
 
-  def str(self, names):
-    return '"double"'
+class NamedSchema(Schema):
+  """Named Schemas specified in NAMED_TYPES."""
+  def __init__(self, type, name, namespace=None, names=None):
+    # Ensure valid ctor args
+    if not name:
+      fail_msg = 'Named Schemas must have a non-empty name.'
+      raise SchemaParseException(fail_msg)
+    elif not isinstance(name, basestring):
+      fail_msg = 'The name property must be a string.'
+      raise SchemaParseException(fail_msg)
+    elif namespace is not None and not isinstance(namespace, basestring):
+      fail_msg = 'The namespace property must be a string.'
+      raise SchemaParseException(fail_msg)
 
-class _BooleanSchema(Schema):
-  def __init__(self):
-    Schema.__init__(self, BOOLEAN)
+    # Call parent ctor
+    Schema.__init__(self, type)
 
-  def str(self, names):
-    return '"boolean"'
+    # Add class members
+    name, namespace = Name.extract_namespace(name, namespace)
+    self.set_prop('name', name)
+    if namespace is not None: self.set_prop('namespace', namespace)
+
+    # Add name to names dictionary
+    names = Name.add_name(names, self)
+
+  # read-only properties
+  name = property(lambda self: self.get_prop('name'))
+  namespace = property(lambda self: self.get_prop('namespace'))
+  fullname = property(lambda self: 
+                      Name.make_fullname(self.name, self.namespace))
 
-class _NullSchema(Schema):
-  def __init__(self):
-    Schema.__init__(self, NULL)
+class Field(object):
+  def __init__(self, type, name, default=None, order=None, names=None):
+    self._props = {}
+    self._type_from_names = False
+
+    # Ensure valid ctor args
+    if not name:
+      fail_msg = 'Fields must have a non-empty name.'
+      raise SchemaParseException(fail_msg)
+    elif not isinstance(name, basestring):
+      fail_msg = 'The name property must be a string.'
+      raise SchemaParseException(fail_msg)
+    elif order is not None and order not in VALID_FIELD_SORT_ORDERS:
+      fail_msg = 'The order property %s is not valid.' % order
+      raise SchemaParseException(fail_msg)
+
+    # add members
+    if (isinstance(type, basestring) and names is not None
+        and names.has_key(type)):
+      type_schema = names[type]
+      self._type_from_names = True
+    else:
+      try:
+        type_schema = make_avsc_object(type, names)
+      except:
+        fail_msg = 'Type property not a valid Avro schema.'
+        raise SchemaParseException(fail_msg)
+    self.set_prop('type', type_schema)
+    self.set_prop('name', name)
+    # TODO(hammer): check to ensure default is valid
+    if default is not None: self.set_prop('default', default)
+    if order is not None: self.set_prop('order', order)
+
+  # read-only properties
+  type = property(lambda self: self.get_prop('type'))
+  name = property(lambda self: self.get_prop('name'))
+  default = property(lambda self: self.get_prop('default'))
+  order = property(lambda self: self.get_prop('order'))
+  props = property(lambda self: self._props)
+  type_from_names = property(lambda self: self._type_from_names)
+
+  # utility functions to manipulate properties dict
+  def get_prop(self, key):
+    return self.props.get(key)
+  def set_prop(self, key, value):
+    self.props[key] = value
+
+  def __str__(self):
+    to_dump = self.props.copy()
+    if self.type_from_names:
+      to_dump['type'] = self.type.fullname
+    else:
+      to_dump['type'] = json.loads(str(to_dump['type']))
+    return json.dumps(to_dump)
 
-  def str(self, names):
-    return '"null"'
+  def __eq__(self, that):
+    to_cmp = json.loads(str(self))
+    return to_cmp == json.loads(str(that))
+
+#
+# Primitive Types
+#
+class PrimitiveSchema(Schema):
+  """Valid primitive types are in PRIMITIVE_TYPES."""
+  def __init__(self, type):
+    # Ensure valid ctor args
+    if type not in PRIMITIVE_TYPES:
+      raise AvroException("%s is not a valid primitive type." % type)
 
-class NamedSchema(Schema):
-  """Named Schemas include Record, Enum, and Fixed."""
-  def __init__(self, type, name, space):
+    # Call parent ctor
     Schema.__init__(self, type)
-    self.__name = name
-    self.__space = space
-
-  def getname(self):
-    return self.__name
 
-  def getspace(self):
-    return self.__space
+  def __str__(self):
+    # if there are no arbitrary properties, use short form
+    if len(self.props) == 1:
+      return '"%s"' % self.type
+    else:
+      return json.dumps(self.props)
 
-  def equalnames(self, other):
-    if other is None:
-      return False
-    if (self.__name == other.__name and self.__space == other.__space):
-      return True
-    return False
-
-  def namestring(self):
-    str = cStringIO.StringIO()
-    str.write('"name": "' + self.__name + '", ')
-    if self.__space is not None:
-      str.write('"namespace": "' + self.__space + '", ')
-    return str.getvalue()
-
-  def __hash__(self, seen=None):
-    hash = self.gettype().__hash__()
-    hash += self.__name.__hash__()
-    if self.__space is not None:
-      hash += self.__space.__hash__()
-    return hash
+  def __eq__(self, that):
+    return self.props == that.props
 
-class Field(object):
-  def __init__(self, name, schema, defaultvalue=None):
-    self.__name = name
-    self.__schema = schema
-    self.__defaultvalue = defaultvalue
-
-  def getname(self):
-    return self.__name
-
-  def getschema(self):
-    return self.__schema
-
-  def getdefaultvalue(self):
-    return self.__defaultvalue
-
-  def __eq__(self, other, seen={}):
-    return (self.__name == other.__name and
-            self.__schema.__eq__(other.__schema, seen) and 
-            self.__defaultvalue == other.__defaultvalue)
-
-class _RecordSchema(NamedSchema):
-  def __init__(self, fields, name=None, space=None, iserror=False):
-    NamedSchema.__init__(self, RECORD, name, space)
-    self.__fields = fields
-    self.__iserror = iserror
-
-  def getfields(self):
-    return self.__fields
-
-  def iserror(self):
-    return self.__iserror
-
-  def str(self, names):
-    if names.get(self.getname()) is self:
-      return '"%s"' % self.getname()
-    elif self.getname() is not None:
-      names[self.getname()] = self
-    str = cStringIO.StringIO()
-    str.write('{"type": "')
-    if self.iserror():
-      str.write("error")
+#
+# Complex Types (non-recursive)
+#
+
+class FixedSchema(NamedSchema):
+  def __init__(self, name, namespace, size, names=None):
+    # Ensure valid ctor args
+    if not isinstance(size, int):
+      fail_msg = 'Fixed Schema requires a valid integer for size property.'
+      raise AvroException(fail_msg)
+
+    # Call parent ctor
+    NamedSchema.__init__(self, 'fixed', name, namespace, names)
+
+    # Add class members
+    self.set_prop('size', size)
+
+  # read-only properties
+  size = property(lambda self: self.get_prop('size'))
+
+  def __str__(self):
+    return json.dumps(self.props)
+
+  def __eq__(self, that):
+    return self.props == that.props
+
+class EnumSchema(NamedSchema):
+  def __init__(self, name, namespace, symbols, names=None):
+    # Ensure valid ctor args
+    if not isinstance(symbols, list):
+      fail_msg = 'Enum Schema requires a JSON array for the symbols property.'
+      raise AvroException(fail_msg)
+    elif False in [isinstance(s, basestring) for s in symbols]:
+      fail_msg = 'Enum Schems requires All symbols to be JSON strings.'
+      raise AvroException(fail_msg)
+
+    # Call parent ctor
+    NamedSchema.__init__(self, 'enum', name, namespace, names)
+
+    # Add class members
+    self.set_prop('symbols', symbols)
+
+  # read-only properties
+  symbols = property(lambda self: self.get_prop('symbols'))
+
+  def __str__(self):
+    return json.dumps(self.props)
+
+  def __eq__(self, that):
+    return self.props == that.props
+
+#
+# Complex Types (recursive)
+#
+
+class ArraySchema(Schema):
+  def __init__(self, items, names=None):
+    # initialize private class members
+    self._items_schema_from_names = False
+
+    # Call parent ctor
+    Schema.__init__(self, 'array')
+
+    # Add class members
+    if isinstance(items, basestring) and names.has_key(items):
+      items_schema = names[items]
+      self._items_schema_from_names = True
     else:
-      str.write("record")
-    str.write('", ')
-    str.write(self.namestring())
-    str.write('"fields": [')
-    count = 0
-    for field in self.__fields.values():
-      str.write('{"name": "')
-      str.write(field.getname())
-      str.write('", "type": ')
-      str.write(field.getschema().str(names))
-      if field.getdefaultvalue() is not None:
-        str.write(', "default": ')
-        str.write(repr(field.getdefaultvalue()))
-      str.write('}')
-      count += 1
-      if count < len(self.__fields):
-        str.write(',')
-    str.write(']}')
-    return str.getvalue()
-
-  def __eq__(self, other, seen={}):
-    if self is other or seen.get(id(self)) is other:
-      return True
-    if isinstance(other, _RecordSchema) and self.equalnames(other):
-      size = len(self.__fields)
-      if len(other.__fields) != size:
-        return False
-      seen[id(self)] = other
-      for field in self.__fields.values():
-        if not field.__eq__(other.__fields.get(field.getname()), seen):
-          return False
-      return True
+      try:
+        items_schema = make_avsc_object(items, names)
+      except:
+        fail_msg = 'Items schema not a valid Avro schema.'
+        raise SchemaParseException(fail_msg)
+
+    self.set_prop('items', items_schema)
+
+  # read-only properties
+  items = property(lambda self: self.get_prop('items'))
+  items_schema_from_names = property(lambda self: self._items_schema_from_names)
+
+  def __str__(self):
+    to_dump = self.props.copy()
+    if self.items_schema_from_names:
+      to_dump['items'] = self.get_prop('items').fullname
     else:
-      return False
+      to_dump['items'] = json.loads(str(to_dump['items']))
+    return json.dumps(to_dump)
 
-  def __hash__(self, seen=set()):
-    if seen.__contains__(id(self)):
-      return 0
-    seen.add(id(self))
-    hash = NamedSchema.__hash__(self, seen)
-    for field in self.__fields.values():
-      hash = hash + field.getschema().__hash__(seen)
-    return hash
-
-class _ArraySchema(Schema):
-  def __init__(self, elemtype):
-    Schema.__init__(self, ARRAY)
-    self.__elemtype = elemtype
-
-  def getelementtype(self):
-    return self.__elemtype
-
-  def str(self, names):
-    str = cStringIO.StringIO()
-    str.write('{"type": "array", "items": ')
-    str.write(self.__elemtype.str(names))
-    str.write("}")
-    return str.getvalue()
-
-  def __eq__(self, other, seen={}):
-    if self is other or seen.get(id(self)) is other:
-      return True
-    seen[id(self)]= other
-    return (isinstance(other, _ArraySchema) and 
-            self.__elemtype.__eq__(other.__elemtype, seen))
-
-  def __hash__(self, seen=set()):
-    if seen.__contains__(id(self)):
-      return 0
-    seen.add(id(self))
-    return self.gettype().__hash__() + self.__elemtype.__hash__(seen)
-
-class _MapSchema(Schema):
-  def __init__(self, valuetype):
-    Schema.__init__(self, MAP)
-    self.__vtype = valuetype
-
-  def getvaluetype(self):
-    return self.__vtype
-
-  def str(self, names):
-    str = cStringIO.StringIO()
-    str.write('{"type": "map", "values": ')
-    str.write(self.__vtype.str(names));
-    str.write("}")
-    return str.getvalue()
-
-  def __eq__(self, other, seen={}):
-    if self is other or seen.get(id(self)) is other:
-      return True
-    seen[id(self)] = other
-    return (isinstance(other, _MapSchema) and
-            self.__vtype.__eq__(other.__vtype), seen)
-
-  def __hash__(self, seen=set()):
-    if seen.__contains__(id(self)):
-      return 0
-    seen.add(id(self))
-    return (self.gettype().__hash__() +
-            self.__vtype.__hash__(seen))
-
-class _UnionSchema(Schema):
-  def __init__(self, elemtypes):
-    Schema.__init__(self, UNION)
-    self.__elemtypes = elemtypes
-
-  def getelementtypes(self):
-    return self.__elemtypes
-
-  def str(self, names):
-    str = cStringIO.StringIO()
-    str.write("[")
-    count = 0
-    for elemtype in self.__elemtypes:
-      str.write(elemtype.str(names))
-      count += 1
-      if count < len(self.__elemtypes):
-        str.write(",")
-    str.write("]")
-    return str.getvalue()
-
-  def __eq__(self, other, seen={}):
-    if self is other or seen.get(id(self)) is other:
-      return True
-    seen[id(self)]= other
-    if isinstance(other, _UnionSchema):
-      size = len(self.__elemtypes)
-      if len(other.__elemtypes) != size:
-        return False
-      for i in range(0, size):
-        if not self.__elemtypes[i].__eq__(other.__elemtypes[i], seen):
-          return False
-      return True
+  def __eq__(self, that):
+    to_cmp = json.loads(str(self))
+    return to_cmp == json.loads(str(that))
+
+class MapSchema(Schema):
+  def __init__(self, values, names=None):
+    # initialize private class members
+    self._values_schema_from_names = False
+
+    # Call parent ctor
+    Schema.__init__(self, 'map')
+
+    # Add class members
+    if isinstance(values, basestring) and names.has_key(values):
+      values_schema = names[values]
+      self._values_schema_from_names = True
     else:
-      return False
-
-  def __hash__(self, seen=set()):
-    if seen.__contains__(id(self)):
-      return 0
-    seen.add(id(self))
-    hash = self.gettype().__hash__() 
-    for elem in self.__elemtypes:
-      hash = hash + elem.__hash__(seen)
-    return hash
-
-class _EnumSchema(NamedSchema):
-  def __init__(self, name, space, symbols):
-    NamedSchema.__init__(self, ENUM, name, space)
-    self.__symbols = symbols
-    self.__ordinals = dict()
-    for i, symbol in enumerate(symbols):
-      self.__ordinals[symbol] = i
-
-  def getenumsymbols(self):
-    return self.__symbols
-
-  def getenumordinal(self, symbol):
-    return self.__ordinals.get(symbol)
-
-  def str(self, names):
-    if names.get(self.getname()) is self:
-      return '"%s"' % self.getname()
-    elif self.getname() is not None:
-      names[self.getname()] = self
-    str = cStringIO.StringIO()
-    str.write('{"type": "enum", ')
-    str.write(self.namestring())
-    str.write('"symbols": [')
-    count = 0
-    for symbol in self.__symbols:
-      str.write('"%s"' % symbol)
-      count += 1
-      if count < len(self.__symbols):
-        str.write(',')
-    str.write(']}')
-    return str.getvalue()
-
-  def __eq__(self, other, seen={}):
-    if self is other or seen.get(id(self)) is other:
-      return True
-    if isinstance(other, _EnumSchema) and self.equalnames(other):
-      size = len(self.__symbols)
-      if len(other.__symbols) != size:
-        return False
-      seen[id(self)] = other
-      for i in range(0, size):
-        if not self.__symbols[i].__eq__(other.__symbols[i]):
-          return False
-      return True
+      try:
+        values_schema = make_avsc_object(values, names)
+      except:
+        fail_msg = 'Values schema not a valid Avro schema.'
+        raise SchemaParseException(fail_msg)
+
+    self.set_prop('values', values_schema)
+
+  # read-only properties
+  values = property(lambda self: self.get_prop('values'))
+  values_schema_from_names = property(lambda self:
+                                      self._values_schema_from_names)
+
+  def __str__(self):
+    to_dump = self.props.copy()
+    if self.values_schema_from_names:
+      to_dump['values'] = self.get_prop('values').fullname
     else:
-      return False
+      to_dump['values'] = json.loads(str(to_dump['values']))
+    return json.dumps(to_dump)
 
-  def __hash__(self, seen=set()):
-    if seen.__contains__(id(self)):
-      return 0
-    seen.add(id(self))
-    hash = NamedSchema.__hash__(self, seen)
-    for symbol in self.__symbols:
-      hash += symbol.__hash__()
-    return hash
-
-class _FixedSchema(NamedSchema):
-  def __init__(self, name, space, size):
-    NamedSchema.__init__(self, FIXED, name, space)
-    self.__size = size
-
-  def getsize(self):
-    return self.__size
-
-  def str(self, names):
-    if names.get(self.getname()) is self:
-      return '"%s"' % self.getname()
-    elif self.getname() is not None:
-      names[self.getname()] = self
-    str = cStringIO.StringIO()
-    str.write('{"type": "fixed", ')
-    str.write(self.namestring())
-    str.write('"size": ' + repr(self.__size) + '}')
-    return str.getvalue()
-
-  def __eq__(self, other, seen=None):
-    if self is other:
-      return True
-    if (isinstance(other, _FixedSchema) and self.equalnames(other) 
-        and self.__size == other.__size):
-      return True
-    return False
-
-  def __hash__(self, seen=None):
-    return NamedSchema.__hash__(self, seen) + self.__size.__hash__()
-
-_PRIMITIVES = {'string':_StringSchema(),
-        'bytes':_BytesSchema(),
-        'int':_IntSchema(),
-        'long':_LongSchema(),
-        'float':_FloatSchema(),
-        'double':_DoubleSchema(),
-        'boolean':_BooleanSchema(),
-        'null':_NullSchema()}    
-
-class _Names(odict.OrderedDict):
-  def __init__(self, names=_PRIMITIVES):
-    odict.OrderedDict.__init__(self)
-    self.__defaults = names
-
-  def get(self, key):
-    val = odict.OrderedDict.get(self, key)
-    if val is None:
-      val = self.__defaults.get(key)
-    return val
-
-  def __setitem__(self, key, val):
-    if odict.OrderedDict.get(self, key) is not None:
-      raise SchemaParseException("Can't redefine: " + key.__str__())
-    odict.OrderedDict.__setitem__(self, key, val)
-
-class AvroException(Exception):
-  pass
-
-class SchemaParseException(AvroException):
-  pass
-
-def _parse(obj, names):
-  if isinstance(obj, basestring):
-    schema = names.get(obj)
-    if schema is not None:
-      return schema
-    else:
-      raise SchemaParseException("Undefined name: " + obj.__str__())
-  elif isinstance(obj, dict):
-    type = obj.get("type")
-    if type is None:
-      raise SchemaParseException("No type: " + obj.__str__())
-    if (type == "record" or type == "error" or 
-        type == "enum" or type == "fixed"):
-      name = obj.get("name")
-      space = obj.get("namespace")
-      if name is None:
-        raise SchemaParseException("No name in schema: " + obj.__str__())
-      if type == "record" or type == "error":
-        fields = odict.OrderedDict()
-        schema = _RecordSchema(fields, name, space, type == "error")
-        names[name] = schema
-        fieldsnode = obj.get("fields")
-        if fieldsnode is None:
-          raise SchemaParseException("Record has no fields: " + obj.__str__())
-        for field in fieldsnode:
-          fieldname = field.get("name")
-          if fieldname is None:
-            raise SchemaParseException("No field name: " + field.__str__())
-          fieldtype = field.get("type")
-          if fieldtype is None:
-            raise SchemaParseException("No field type: " + field.__str__())
-          defaultval = field.get("default")
-          fields[fieldname] = Field(fieldname, _parse(fieldtype, names), 
-                                    defaultval)
-        return schema
-      elif type == "enum":
-        symbolsnode = obj.get("symbols")
-        if symbolsnode == None or not isinstance(symbolsnode, list):
-          raise SchemaParseException("Enum has no symbols: " + obj.__str__())
-        symbols = list()
-        for symbol in symbolsnode:
-          symbols.append(symbol)
-        schema = _EnumSchema(name, space, symbols)
-        names[name] = schema
-        return schema
-      elif type == "fixed":
-        schema = _FixedSchema(name, space, obj.get("size"))
-        names[name] = schema
-        return schema
-    elif type == "array":
-      return _ArraySchema(_parse(obj.get("items"), names))
-    elif type == "map":
-      return _MapSchema(_parse(obj.get("values"), names))
+  def __eq__(self, that):
+    to_cmp = json.loads(str(self))
+    return to_cmp == json.loads(str(that))
+
+class UnionSchema(Schema):
+  """
+  names is a dictionary of schema objects
+  """
+  def __init__(self, schemas, names=None):
+    # Ensure valid ctor args
+    if not isinstance(schemas, list):
+      fail_msg = 'Union schema requires a list of schemas.'
+      raise SchemaParseException(fail_msg)
+
+    # Call parent ctor
+    Schema.__init__(self, 'union')
+
+    # Add class members
+    schema_objects = []
+    self._schema_from_names_indices = []
+    for i, schema in enumerate(schemas):
+      from_names = False
+      if isinstance(schema, basestring) and names.has_key(schema):
+        new_schema = names[schema]
+        from_names = True
+      else:
+        try:
+          new_schema = make_avsc_object(schema, names)
+        except:
+          raise SchemaParseException('Union item must be a valid Avro schema.')
+      # check the new schema
+      if (new_schema.type in VALID_TYPES and new_schema.type not in NAMED_TYPES
+          and new_schema.type in [schema.type for schema in schema_objects]):
+        raise SchemaParseException('%s type already in Union' % new_schema.type)
+      elif new_schema.type == 'union':
+        raise SchemaParseException('Unions cannont contain other unions.')
+      else:
+        schema_objects.append(new_schema)
+        if from_names: self._schema_from_names_indices.append(i)
+    self._schemas = schema_objects
+
+  # read-only properties
+  schemas = property(lambda self: self._schemas)
+  schema_from_names_indices = property(lambda self:
+                                       self._schema_from_names_indices)
+
+  def __str__(self):
+    to_dump = []
+    for i, schema in enumerate(self.schemas):
+      if i in self.schema_from_names_indices:
+        to_dump.append(schema.fullname)
+      else:
+        to_dump.append(json.loads(str(schema)))
+    return json.dumps(to_dump)
+
+  def __eq__(self, that):
+    to_cmp = json.loads(str(self))
+    return to_cmp == json.loads(str(that))
+
+class RecordSchema(NamedSchema):
+  @staticmethod
+  def make_field_objects(field_data, names):
+    """We're going to need to make message parameters too."""
+    field_objects = []
+    field_names = []
+    for i, field in enumerate(field_data):
+      if hasattr(field, 'get') and callable(field.get):
+        type = field.get('type')
+        name = field.get('name')
+        default = field.get('default')
+        order = field.get('order')
+        new_field = Field(type, name, default, order, names)
+        # make sure field name has not been used yet
+        if new_field.name in field_names:
+          fail_msg = 'Field name %s already in use.' % new_field.name
+          raise SchemaParseException(fail_msg)
+        field_names.append(new_field.name)
+      else:
+        raise SchemaParseException('Not a valid field: %s' % field)
+      field_objects.append(new_field)
+    return field_objects
+
+  def __init__(self, name, namespace, fields, names=None, schema_type='record'):
+    # Ensure valid ctor args
+    if fields is None:
+      fail_msg = 'Record schema requires a non-empty fields property.'
+      raise SchemaParseException(fail_msg)
+    elif not isinstance(fields, list):
+      fail_msg = 'Fields property must be a list of Avro schemas.'
+      raise SchemaParseException(fail_msg)
+
+    # Call parent ctor (adds own name to namespace, too)
+    NamedSchema.__init__(self, schema_type, name, namespace, names)
+
+    # Add class members
+    field_objects = RecordSchema.make_field_objects(fields, names)
+    self.set_prop('fields', field_objects)
+
+  # read-only properties
+  fields = property(lambda self: self.get_prop('fields'))
+
+  @property
+  def fields_dict(self):
+    fields_dict = {}
+    for field in self.fields:
+      fields_dict[field.name] = field
+    return fields_dict
+
+  def __str__(self):
+    to_dump = self.props.copy()
+    new_fields = []
+    for field in to_dump['fields']:
+      new_fields.append(json.loads(str(field)))
+    to_dump['fields'] = new_fields
+    return json.dumps(to_dump)
+
+  def __eq__(self, that):
+    to_cmp = json.loads(str(self))
+    return to_cmp == json.loads(str(that))
+
+#
+# Module Methods
+#
+
+# TODO(hammer): handle non-reserved properties
+def make_avsc_object(json_data, names=None):
+  """
+  Build Avro Schema from data parsed out of JSON string.
+
+  @arg names: dict of schema name, object pairs
+  """
+  # JSON object (non-union)
+  if hasattr(json_data, 'get') and callable(json_data.get):
+    type = json_data.get('type')
+    if type in PRIMITIVE_TYPES:
+      return PrimitiveSchema(type)
+    elif type in NAMED_TYPES:
+      name = json_data.get('name')
+      namespace = json_data.get('namespace')
+      if type == 'fixed':
+        size = json_data.get('size')
+        return FixedSchema(name, namespace, size, names)
+      elif type == 'enum':
+        symbols = json_data.get('symbols')
+        return EnumSchema(name, namespace, symbols, names)
+      elif type in ['record', 'error']:
+        fields = json_data.get('fields')
+        return RecordSchema(name, namespace, fields, names, type)
+      else:
+        raise SchemaParseException('Unknown Named Type: %s' % type)
+    elif type in VALID_TYPES:
+      if type == 'array':
+        items = json_data.get('items')
+        return ArraySchema(items, names)
+      elif type == 'map':
+        values = json_data.get('values')
+        return MapSchema(values, names)
+      else:
+        raise SchemaParseException('Unknown Valid Type: %s' % type)
+    elif type is None:
+      raise SchemaParseException('No "type" property: %s' % json_data)
     else:
-      raise SchemaParseException("Type not yet supported: " + type.__str__())
-  elif isinstance(obj, list):
-    elemtypes = list()
-    for elemtype in obj:
-      elemtypes.append(_parse(elemtype, names))
-    return _UnionSchema(elemtypes)
+      raise SchemaParseException('Undefined type: %s' % type)
+  # JSON array (union)
+  elif isinstance(json_data, list):
+    return UnionSchema(json_data, names)
+  # JSON string (primitive)
+  elif json_data in PRIMITIVE_TYPES:
+    return PrimitiveSchema(json_data)
+  # not for us!
   else:
-    raise SchemaParseException("Schema not yet supported:" + obj.__str__())
-
-def stringval(schm):
-  """Returns the string representation of the schema instance."""
-  return schm.str(_Names())
+    fail_msg = "Could not make an Avro Schema object from %s." % json_data
+    raise SchemaParseException(fail_msg)
 
+# TODO(hammer): make method for reading from a file?
 def parse(json_string):
-  """Constructs the Schema from the json text."""
-  dict = json.loads(json_string)
-  return _parse(dict, _Names())
+  """Constructs the Schema from the JSON text."""
+  # TODO(hammer): preserve stack trace from JSON parse
+  # parse the JSON
+  try:
+    json_data = json.loads(json_string)
+  except:
+    raise SchemaParseException('Error parsing JSON: %s' % json_string)
+
+  # Initialize the names dictionary
+  names = {}
+
+  # construct the Avro Schema object
+  return make_avsc_object(json_data, names)

Modified: hadoop/avro/trunk/src/rat-excludes.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/rat-excludes.txt?rev=895732&r1=895731&r2=895732&view=diff
==============================================================================
--- hadoop/avro/trunk/src/rat-excludes.txt (original)
+++ hadoop/avro/trunk/src/rat-excludes.txt Mon Jan  4 18:09:42 2010
@@ -13,7 +13,6 @@
 **/Makefile**
 **/configure**
 doc/**
-lib/py/odict.py
 lib/py/simplejson/**
 src/c++/Doxyfile
 src/c++/jsonschemas/**



Mime
View raw message