avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r882015 - in /hadoop/avro/trunk: CHANGES.txt src/py/avro/datafile.py src/py/avro/io.py src/test/py/testio.py
Date Thu, 19 Nov 2009 00:45:26 GMT
Author: cutting
Date: Thu Nov 19 00:45:26 2009
New Revision: 882015

URL: http://svn.apache.org/viewvc?rev=882015&view=rev
Log:
AVRO-201.  Move Python data file code into its own module.  Contributed by Jeff Hammerbacher.

Added:
    hadoop/avro/trunk/src/py/avro/datafile.py
      - copied, changed from r882014, hadoop/avro/trunk/src/py/avro/io.py
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/src/py/avro/io.py
    hadoop/avro/trunk/src/test/py/testio.py

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=882015&r1=882014&r2=882015&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Nov 19 00:45:26 2009
@@ -8,6 +8,9 @@
     This reverses the inheritance of most classes in the specific and
     reflect packages. (cutting)
 
+    AVRO-201.  Move Python data file code into its own module.
+    (Jeff Hammerbacher via cutting)
+
   NEW FEATURES
 
     AVRO-151. Validating Avro schema parser for C (massie)

Copied: hadoop/avro/trunk/src/py/avro/datafile.py (from r882014, hadoop/avro/trunk/src/py/avro/io.py)
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/datafile.py?p2=hadoop/avro/trunk/src/py/avro/datafile.py&p1=hadoop/avro/trunk/src/py/avro/io.py&r1=882014&r2=882015&rev=882015&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/io.py (original)
+++ hadoop/avro/trunk/src/py/avro/datafile.py Thu Nov 19 00:45:26 2009
@@ -1,201 +1,24 @@
-#Licensed to the Apache Software Foundation (ASF) under one
-#or more contributor license agreements.  See the NOTICE file
-#distributed with this work for additional information
-#regarding copyright ownership.  The ASF licenses this file
-#to you under the Apache License, Version 2.0 (the
-#"License"); you may not use this file except in compliance
-#with the License.  You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-#http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
 #
-#Unless required by applicable law or agreed to in writing, software
-#distributed under the License is distributed on an "AS IS" BASIS,
-#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#See the License for the specific language governing permissions and
-#limitations under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 
-"""Input/Output utilities."""
+"""Read/Write Avro Data Files"""
 
 import struct, uuid, cStringIO
 import avro.schema as schema
-
-#Constants
-_INT_MIN_VALUE = -(1 << 31)
-_INT_MAX_VALUE = (1 << 31) - 1
-_LONG_MIN_VALUE = -(1 << 63)
-_LONG_MAX_VALUE = (1 << 63) - 1
-_STRUCT_INT = struct.Struct('!I')
-_STRUCT_LONG = struct.Struct('!Q')
-_STRUCT_FLOAT = struct.Struct('!f')
-_STRUCT_DOUBLE = struct.Struct('!d')
-
-class AvroTypeException(schema.AvroException):
-  """Raised when illegal type is used."""
-  def __init__(self, schm=None, datum=None, msg=None):
-    if msg is None:
-      msg = "Not a "+schema.stringval(schm)+": "+datum.__str__()
-    schema.AvroException.__init__(self, msg)
-
-class DatumReaderBase(object):
-  """Base class for reading data of a schema."""
-
-  def setschema(self, schema):
-    pass
-
-  def read(self, decoder):
-    """Read a datum. Traverse the schema, depth-first, reading all leaf values
-    in the schema into a datum that is returned"""
-    pass
-
-class DatumWriterBase(object):
-  """Base class for writing data of a schema."""
-  def setschema(self, schema):
-    pass
-
-  def write(self, data, encoder):
-    """Write a datum. Traverse the schema, depth first, writing each leaf value
-    in the schema from the datum to the output."""
-    pass
-
-
-class Decoder(object):
-  """Read leaf values."""
-
-  def __init__(self, reader):
-    self.__reader = reader
-
-  def readboolean(self):
-    return ord(self.__reader.read(1)) == 1
-
-  def readint(self):
-    return self.readlong()
-
-  def readlong(self):
-    b = ord(self.__reader.read(1))
-    n = b & 0x7F
-    shift = 7
-    while (b & 0x80) != 0:
-      b = ord(self.__reader.read(1))
-      n |= (b & 0x7F) << shift
-      shift += 7
-    datum = (n >> 1) ^ -(n & 1)
-    return datum
-
-  def readfloat(self):
-    bits = (((ord(self.__reader.read(1)) & 0xffL)    ) |
-        ((ord(self.__reader.read(1)) & 0xffL) <<  8) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 16) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 24))
-    return _STRUCT_FLOAT.unpack(_STRUCT_INT.pack(bits))[0]
-
-  def readdouble(self):
-    bits = (((ord(self.__reader.read(1)) & 0xffL)    ) |
-        ((ord(self.__reader.read(1)) & 0xffL) <<  8) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 16) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 24) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 32) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 40) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 48) |
-        ((ord(self.__reader.read(1)) & 0xffL) << 56))
-    return _STRUCT_DOUBLE.unpack(_STRUCT_LONG.pack(bits))[0]
-
-  def readbytes(self):
-    return self.read(self.readlong())
-
-  def readutf8(self):
-    return unicode(self.readbytes(), "utf-8")
-
-  def read(self, len):
-    return struct.unpack(len.__str__()+'s', self.__reader.read(len))[0]
-
-  def skipboolean(self):
-    self.skip(1)
-
-  def skipint(self):
-    self.skip(4)
-
-  def skiplong(self):
-    self.skip(8)
-
-  def skipfloat(self):
-    self.skip(4)
-
-  def skipdouble(self):
-    self.skip(8)
-
-  def skipbytes(self):
-    self.skip(self.readlong())
-
-  def skiputf8(self):
-    self.skipbytes()
-
-  def skip(self, len):
-    self.__reader.seek(self.__reader.tell()+len)
-
-class Encoder(object):
-  """Write leaf values."""
-
-  def __init__(self, writer):
-    self.__writer = writer
-  
-  def writeboolean(self, datum):
-    if not isinstance(datum, bool):
-      raise AvroTypeException(schema.BOOLEAN, datum)
-    if datum:
-      self.__writer.write(chr(1))
-    else:
-      self.__writer.write(chr(0))
-
-  def writeint(self, n):
-    if n < _INT_MIN_VALUE or n > _INT_MAX_VALUE:
-      raise AvroTypeException(schema.INT, n, 
-                              "datam too big to fit into avro int")
-    self.writelong(n);
-
-  def writelong(self, n):
-    if n < _LONG_MIN_VALUE or n > _LONG_MAX_VALUE:
-      raise AvroTypeException(schema.LONG, n, 
-                              "datam too big to fit into avro long")
-    n = (n << 1) ^ (n >> 63)
-    while (n & ~0x7F) != 0:
-      self.__writer.write(chr((n & 0x7f) | 0x80))
-      n >>=7
-    self.__writer.write(chr(n))
-
-  def writefloat(self, datum):
-    bits = _STRUCT_INT.unpack(_STRUCT_FLOAT.pack(datum))[0]
-    self.__writer.write(chr((bits    ) & 0xFF))
-    self.__writer.write(chr((bits >>  8) & 0xFF))
-    self.__writer.write(chr((bits >>  16) & 0xFF))
-    self.__writer.write(chr((bits >>  24) & 0xFF))
-
-  def writedouble(self, datum):
-    bits = _STRUCT_LONG.unpack(_STRUCT_DOUBLE.pack(datum))[0]
-    self.__writer.write(chr((bits    ) & 0xFF))
-    self.__writer.write(chr((bits >>  8) & 0xFF))
-    self.__writer.write(chr((bits >>  16) & 0xFF))
-    self.__writer.write(chr((bits >>  24) & 0xFF))
-    self.__writer.write(chr((bits >>  32) & 0xFF))
-    self.__writer.write(chr((bits >>  40) & 0xFF))
-    self.__writer.write(chr((bits >>  48) & 0xFF))
-    self.__writer.write(chr((bits >>  56) & 0xFF))
-
-  def writebytes(self, datum):
-    if not isinstance(datum, str):
-      raise AvroTypeException(schema.BYTES, datum, 
-                              "avro BYTES should be python str")
-    self.writelong(len(datum))
-    self.__writer.write(struct.pack(len(datum).__str__()+'s',datum))
-
-  def writeutf8(self, datum):
-    if not isinstance(datum, basestring):
-      raise AvroTypeException(schema.STRING, datum, 
-                              "avro STRING should be python unicode")
-    datum = datum.encode("utf-8")
-    self.writebytes(datum)
-
-  def write(self, datum):
-    self.__writer.write(datum)
+import avro.io as io
 
 #Data file constants.
 _VERSION = 0
@@ -213,13 +36,13 @@
 
   def __init__(self, schm, writer, dwriter):
     self.__writer = writer
-    self.__encoder = Encoder(writer)
+    self.__encoder = io.Encoder(writer)
     self.__dwriter = dwriter
     self.__dwriter.setschema(schm)
     self.__count = 0  #entries in file
     self.__blockcount = 0  #entries in current block
     self.__buffer = cStringIO.StringIO()
-    self.__bufwriter = Encoder(self.__buffer)
+    self.__bufwriter = io.Encoder(self.__buffer)
     self.__meta = dict()
     self.__sync = uuid.uuid4().bytes
     self.__meta["sync"] = self.__sync
@@ -290,7 +113,7 @@
 
   def __init__(self, reader, dreader):
     self.__reader = reader
-    self.__decoder = Decoder(reader)
+    self.__decoder = io.Decoder(reader)
     mag = struct.unpack(len(_MAGIC).__str__()+'s', 
                  self.__reader.read(len(_MAGIC)))[0]
     if mag != _MAGIC:

Modified: hadoop/avro/trunk/src/py/avro/io.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/io.py?rev=882015&r1=882014&r2=882015&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/io.py (original)
+++ hadoop/avro/trunk/src/py/avro/io.py Thu Nov 19 00:45:26 2009
@@ -16,7 +16,7 @@
 
 """Input/Output utilities."""
 
-import struct, uuid, cStringIO
+import struct
 import avro.schema as schema
 
 #Constants
@@ -197,171 +197,3 @@
   def write(self, datum):
     self.__writer.write(datum)
 
-#Data file constants.
-_VERSION = 0
-_MAGIC = "Obj"+chr(_VERSION)
-_SYNC_SIZE = 16
-_SYNC_INTERVAL = 1000*_SYNC_SIZE
-_FOOTER_BLOCK = -1
-class DataFileWriter(object):
-  """Stores in a file a sequence of data conforming to a schema. The schema is 
-  stored in the file with the data. Each datum in a file is of the same
-  schema. Data is grouped into blocks.
-  A synchronization marker is written between blocks, so that
-  files may be split. Blocks may be compressed. Extensible metadata is
-  stored at the end of the file. Files may be appended to."""
-
-  def __init__(self, schm, writer, dwriter):
-    self.__writer = writer
-    self.__encoder = Encoder(writer)
-    self.__dwriter = dwriter
-    self.__dwriter.setschema(schm)
-    self.__count = 0  #entries in file
-    self.__blockcount = 0  #entries in current block
-    self.__buffer = cStringIO.StringIO()
-    self.__bufwriter = Encoder(self.__buffer)
-    self.__meta = dict()
-    self.__sync = uuid.uuid4().bytes
-    self.__meta["sync"] = self.__sync
-    self.__meta["codec"] = "null"
-    self.__meta["schema"] = schema.stringval(schm)
-    self.__writer.write(struct.pack(len(_MAGIC).__str__()+'s',
-                                    _MAGIC))
-
-  def setmeta(self, key, val):
-    """Set a meta data property."""
-    self.__meta[key] = val
-
-  def append(self, datum):
-    """Append a datum to the file."""
-    self.__dwriter.write(datum, self.__bufwriter)
-    self.__count+=1
-    self.__blockcount+=1
-    if self.__buffer.tell() >= _SYNC_INTERVAL:
-      self.__writeblock()
-
-  def __writeblock(self):
-    if self.__blockcount > 0:
-      self.__writer.write(self.__sync)
-      self.__encoder.writelong(self.__blockcount)
-      self.__writer.write(self.__buffer.getvalue())
-      self.__buffer.truncate(0) #reset
-      self.__blockcount = 0
-
-  def sync(self):
-    """Return the current position as a value that may be passed to
-    DataFileReader.seek(long). Forces the end of the current block,
-    emitting a synchronization marker."""
-    self.__writeblock()
-    return self.__writer.tell()
-
-  def flush(self):
-    """Flush the current state of the file, including metadata."""
-    self.__writefooter()
-    self.__writer.flush()
-
-  def close(self):
-    """Close the file."""
-    self.flush()
-    self.__writer.close()
-
-  def __writefooter(self):
-    self.__writeblock()
-    self.__meta["count"] = self.__count.__str__()
-    
-    self.__bufwriter.writelong(len(self.__meta))
-    for k,v in self.__meta.items():
-      self.__bufwriter.writeutf8(unicode(k,'utf-8'))
-      self.__bufwriter.writebytes(str(v))
-    size = self.__buffer.tell() + 4
-    self.__writer.write(self.__sync)
-    self.__encoder.writelong(_FOOTER_BLOCK)
-    self.__encoder.writelong(size)
-    self.__buffer.flush()
-    self.__writer.write(self.__buffer.getvalue())
-    self.__buffer.truncate(0) #reset
-    self.__writer.write(chr((size >> 24) & 0xFF))
-    self.__writer.write(chr((size >> 16) & 0xFF))
-    self.__writer.write(chr((size >> 8) & 0xFF))
-    self.__writer.write(chr((size >> 0) & 0xFF))
-
-class DataFileReader(object):
-  """Read files written by DataFileWriter."""
-
-  def __init__(self, reader, dreader):
-    self.__reader = reader
-    self.__decoder = Decoder(reader)
-    mag = struct.unpack(len(_MAGIC).__str__()+'s', 
-                 self.__reader.read(len(_MAGIC)))[0]
-    if mag != _MAGIC:
-      raise schema.AvroException("Not an avro data file")
-    #find the length
-    self.__reader.seek(0,2)
-    self.__length = self.__reader.tell()
-    self.__reader.seek(-4, 2)
-    footersize = (int(ord(self.__reader.read(1)) << 24) +
-            int(ord(self.__reader.read(1)) << 16) +
-            int(ord(self.__reader.read(1)) << 8) +
-            int(ord(self.__reader.read(1))))
-    seekpos = self.__reader.seek(self.__length-footersize)
-    metalength = self.__decoder.readlong()
-    if metalength < 0:
-      metalength = -metalength
-      self.__decoder.readlong() #ignore byteCount if this is a blocking map
-    self.__meta = dict()
-    for i in range(0, metalength):
-      key = self.__decoder.readutf8()
-      self.__meta[key] = self.__decoder.readbytes()
-    self.__sync = self.__meta.get("sync")
-    self.__count = int(self.__meta.get("count"))
-    self.__codec = self.__meta.get("codec")
-    if (self.__codec != None) and (self.__codec != "null"):
-      raise schema.AvroException("Unknown codec: " + self.__codec)
-    self.__schema = schema.parse(self.__meta.get("schema").encode("utf-8"))
-    self.__blockcount = 0
-    self.__dreader = dreader
-    self.__dreader.setschema(self.__schema)
-    self.__reader.seek(len(_MAGIC))
-
-  def __iter__(self):
-    return self
-
-  def getmeta(self, key):
-    """Return the value of a metadata property."""
-    return self.__meta.get(key)
-
-  def next(self):
-    """Return the next datum in the file."""
-    while self.__blockcount == 0:
-      if self.__reader.tell() == self.__length:
-        raise StopIteration
-      self.__skipsync()
-      self.__blockcount = self.__decoder.readlong()
-      if self.__blockcount == _FOOTER_BLOCK:
-        self.__reader.seek(self.__decoder.readlong()+self.__reader.tell())
-        self.__blockcount = 0
-    self.__blockcount-=1
-    datum = self.__dreader.read(self.__decoder)
-    return datum
-
-  def __skipsync(self):
-    if self.__reader.read(_SYNC_SIZE)!=self.__sync:
-      raise schema.AvroException("Invalid sync!")
-
-  def seek(self, pos):
-    """Move to the specified synchronization point, as returned by 
-    DataFileWriter.sync()."""
-    self.__reader.seek(pos)
-    self.__blockcount = 0
-
-  def sync(self, position):
-    """Move to the next synchronization point after a position."""
-    if self.__reader.tell()+_SYNC_SIZE >= self.__length:
-      self.__reader.seek(self.__length)
-      return
-    self.__reader.seek(position)
-    self.__reader.read(_SYNC_SIZE)
-
-  def close(self):
-    """Close this reader."""
-    self.__reader.close()

Modified: hadoop/avro/trunk/src/test/py/testio.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testio.py?rev=882015&r1=882014&r2=882015&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testio.py (original)
+++ hadoop/avro/trunk/src/test/py/testio.py Thu Nov 19 00:45:26 2009
@@ -18,6 +18,7 @@
 import avro.schema as schema
 import avro.io as io
 import avro.genericio as genericio
+import avro.datafile as datafile
 
 _DIR = "build/test/"
 _FILE = _DIR +"test.py.avro"
@@ -214,12 +215,12 @@
     seed = time.time()
     randomData = self.__random(schm, seed)
     count = 10
-    dw = io.DataFileWriter(schm, open(_FILE, 'wb'), self.__datumwriter())
+    dw = datafile.DataFileWriter(schm, open(_FILE, 'wb'), self.__datumwriter())
     for i in range(0,count):
       dw.append(randomData.next())
     dw.close()
     randomData = self.__random(schm, seed)
-    dr = io.DataFileReader(open(_FILE, "rb"), self.__datumreader())
+    dr = datafile.DataFileReader(open(_FILE, "rb"), self.__datumreader())
     count_read = 0
     for data in dr:
       count_read = count_read + 1
@@ -235,7 +236,7 @@
   file = sys.argv[2]
   count = int(sys.argv[3])
   randomData = RandomData(schm)
-  dw = io.DataFileWriter(schm, open(file, 'wb'), genericio.DatumWriter())
+  dw = datafile.DataFileWriter(schm, open(file, 'wb'), genericio.DatumWriter())
   for i in range(0,count):
     dw.append(randomData.next())
   dw.close()



Mime
View raw message