avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r894889 - in /hadoop/avro/trunk: ./ src/doc/content/xdocs/ src/java/org/apache/avro/file/ src/java/org/apache/avro/tool/ src/test/java/org/apache/avro/ src/test/java/org/apache/avro/tool/
Date Thu, 31 Dec 2009 17:01:59 GMT
Author: cutting
Date: Thu Dec 31 17:01:58 2009
New Revision: 894889

URL: http://svn.apache.org/viewvc?rev=894889&view=rev
Log:
Revised data file format and Java API.

Added:
    hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileStream.java
      - copied, changed from r891421, hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
    hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileConstants.java
    hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
    hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
    hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileReadTool.java
    hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileWriteTool.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/RandomData.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestDataFileTools.java

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Dec 31 17:01:58 2009
@@ -20,6 +20,10 @@
     inherited methods in protocols.  Finally, Java shorts are
     supported as integers.  (cutting)
 
+    AVRO-160. Revised data file format and Java API.  Simplified
+    format now permits streaming but no longer supports multiple
+    schemas per file.  Java API for reading is iterator-based.
+
   NEW FEATURES
 
     AVRO-151. Validating Avro schema parser for C (massie)

Modified: hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/doc/content/xdocs/spec.xml?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/doc/content/xdocs/spec.xml (original)
+++ hadoop/avro/trunk/src/doc/content/xdocs/spec.xml Thu Dec 31 17:01:58 2009
@@ -592,66 +592,53 @@
 
       <p>A file consists of:</p>
       <ul>
-	<li>A <em>header, followed by</em></li>
-	<li>one or more <em>blocks</em>.</li>
+	<li>A <em>file header</em>, followed by</li>
+	<li>one or more <em>file data blocks</em>.</li>
       </ul>
-      <p>There are two kinds of blocks, <em>normal</em>
-	and <em>metadata</em>.  All files must contain at least one
-	metadata block.  A file terminates with its last metadata
-	block.  Any data after the last metadata block is ignored.</p>
 
-      <p>A header consists of:</p>
+      <p>A file header consists of:</p>
       <ul>
-	<li>Four bytes, ASCII 'O', 'b', 'j', followed by zero.</li>
-	<li>A 16-byte sync marker.</li>
+	<li>Four bytes, ASCII 'O', 'b', 'j', followed by 1.</li>
+	<li><em>file metadata</em>, including the schema.</li>
+	<li>The 16-byte, randomly-generated sync marker for this file.</li>
       </ul>
 
-      <p>A metadata block consists of:</p>
+      <p>File metadata consists of:</p>
       <ul>
-	<li>The file's 16-byte sync marker.</li>
-	<li>A long with value -1, identifying this as a metadata block.</li>
-	<li>A long indicating the size in bytes of this block.</li>
 	<li>A long indicating the number of metadata key/value pairs.</li>
 	<li>For each pair, a string key and bytes value.</li>
-	<li>The size in bytes of this block as a 4-byte big-endian integer.
-	  <p>When a file is closed normally, this terminates the file
-	    and permits one to efficiently seek to the start of the
-	    metadata.  If the sync marker there does not match that at
-	    the start of the file, then one must scan for the last
-	    metadata in the file.</p>
-	</li>
       </ul>
 
-      <p>The following metadata properties are reserved:</p>
+      <p>The following file metadata properties are reserved:</p>
       <ul>
 	<li><strong>schema</strong> contains the schema of objects
-	stored in the file, as a string.</li>
-	<li><strong>count</strong> contains the number of objects in
-	the file as a decimal ASCII string.</li>
+	stored in the file, as JSON data (required).</li>
 	<li><strong>codec</strong> the name of the compression codec
 	used to compress blocks, as a string. The only value for codec
 	currently supported is "null" (meaning no compression is
 	performed).  If codec is absent, it is assumed to be
 	"null".</li>
-	<li><strong>sync</strong> the 16-byte sync marker used in this file,
-        as a byte sequence.</li>
       </ul>
 
-      <p>A normal block consists of:</p>
+      <p>A file header is thus described by the following schema:</p>
+      <source>
+{"type": "record", "name": "org.apache.avro.file.Header",
+ "fields" : [
+   {"name": "magic", "type": {"type": "fixed", "size": 4}}
+   {"name": "meta", "type": {"type": "map", "values": "bytes"}}
+   {"name": "sync", "type": {"type": "fixed", "size": 16}}
+  ]
+}
+      </source>
+
+      <p>A file data block consists of:</p>
       <ul>
-	<li>The file's 16-byte sync marker.</li>
-	<li>A long indicating the size in bytes of this block in the file.</li>
+	<li>A long indicating the count of objects in this block.</li>
 	<li>The serialized objects.  If a codec is specified, this is
 	compressed by that codec.</li>
+	<li>The file's 16-byte sync marker.</li>
       </ul>
 
-      <p>Note that this format supports appends, since multiple
-      metadata blocks are permitted.</p>
-
-      <p>To be robust to application failure, implementations can
-      write metadata periodically to limit the amount of the file that
-      must be scanned to find the last metadata block.</p>
-
     </section>
 
     <section>

Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileConstants.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileConstants.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileConstants.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileConstants.java Thu Dec 31 17:01:58 2009
@@ -22,7 +22,7 @@
  * Constants used in data files.
  */
 class DataFileConstants {
-  public static final byte VERSION = 0;
+  public static final byte VERSION = 1;
   public static final byte[] MAGIC = new byte[] {
     (byte)'O', (byte)'b', (byte)'j', VERSION
   };
@@ -31,7 +31,6 @@
   public static final int SYNC_INTERVAL = 1000*SYNC_SIZE; 
 
   public static final String SCHEMA = "schema";
-  public static final String SYNC = "sync";
   public static final String COUNT = "count";
   public static final String CODEC = "codec";
   public static final String NULL_CODEC = "null";

Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java Thu Dec 31 17:01:58 2009
@@ -19,35 +19,19 @@
 
 import java.io.BufferedInputStream;
 import java.io.IOException;
+import java.io.EOFException;
 import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
 import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
 
-import org.apache.avro.Schema;
 import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.BinaryDecoder;
+import static org.apache.avro.file.DataFileConstants.SYNC_SIZE;
 
-/** Read files written by {@link DataFileWriter}.
+/** Random access to files written with {@link DataFileWriter}.
  * @see DataFileWriter
  */
-public class DataFileReader<D> {
-
-  private Schema schema;
-  private DatumReader<D> reader;
-  private SeekableBufferedInput in;
-  private Decoder vin;
-
-  Map<String,byte[]> meta = new HashMap<String,byte[]>();
-
-  private long count;                           // # entries in file
-  private long blockCount;                      // # entries in block
-  byte[] sync = new byte[DataFileConstants.SYNC_SIZE];
-  private byte[] syncBuffer = new byte[DataFileConstants.SYNC_SIZE];
+public class DataFileReader<D> extends DataFileStream<D> {
+  private SeekableBufferedInput sin;
+  private long blockStart;
 
   /** Construct a reader for a file. */
   public DataFileReader(File file, DatumReader<D> reader) throws IOException {
@@ -57,133 +41,59 @@
   /** Construct a reader for a file. */
   public DataFileReader(SeekableInput sin, DatumReader<D> reader)
     throws IOException {
-    this.in = new SeekableBufferedInput(sin);
-
-    byte[] magic = new byte[4];
-    in.read(magic);
-    if (!Arrays.equals(DataFileConstants.MAGIC, magic))
-      throw new IOException("Not a data file.");
-
-    long length = in.length();
-    in.seek(length-4);
-    int footerSize=(in.read()<<24)+(in.read()<<16)+(in.read()<<8)+in.read();
-    in.seek(length-footerSize);
-    this.vin = new BinaryDecoder(in);
-    long l = vin.readMapStart();
-    if (l > 0) {
-      do {
-        for (long i = 0; i < l; i++) {
-          String key = vin.readString(null).toString();
-          ByteBuffer value = vin.readBytes(null);
-          byte[] bb = new byte[value.remaining()];
-          value.get(bb);
-          meta.put(key, bb);
-        }
-      } while ((l = vin.mapNext()) != 0);
-    }
-
-    this.sync = getMeta(DataFileConstants.SYNC);
-    this.count = getMetaLong(DataFileConstants.COUNT);
-    String codec = getMetaString(DataFileConstants.CODEC);
-    if (codec != null && ! codec.equals(DataFileConstants.NULL_CODEC)) {
-      throw new IOException("Unknown codec: " + codec);
-    }
-    this.schema = Schema.parse(getMetaString(DataFileConstants.SCHEMA));
-    this.reader = reader;
-
-    reader.setSchema(schema);
-
-    in.seek(DataFileConstants.MAGIC.length);         // seek to start
-  }
-  
-  /** Return the schema used in this file. */
-  public Schema getSchema() { return schema; }
-  
-  /** Return the number of records in the file. */
-  public long getCount() { return count; }
-  
-  /** Return the value of a metadata property. */
-  public synchronized byte[] getMeta(String key) {
-    return meta.get(key);
-  }
-  /** Return the value of a metadata property. */
-  public synchronized String getMetaString(String key) {
-    byte[] value = getMeta(key);
-    if (value == null) {
-      return null;
-    }
-    try {
-      return new String(value, "UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-  /** Return the value of a metadata property. */
-  public synchronized long getMetaLong(String key) {
-    return Long.parseLong(getMetaString(key));
+    super(new SeekableBufferedInput(sin), reader);
+    this.sin = (SeekableBufferedInput)in;
   }
 
-  /** Return the next datum in the file. */
-  public synchronized D next(D reuse) throws IOException {
-    while (blockCount == 0) {                     // at start of block
-
-      if (in.tell() == in.length())               // at eof
-        return null;
-
-      skipSync();                                 // skip a sync
-
-      blockCount = vin.readLong();                // read blockCount
-         
-      if (blockCount == DataFileConstants.FOOTER_BLOCK) { 
-        in.seek(vin.readLong()+in.tell());        // skip a footer
-        blockCount = 0;
-      }
-    }
-    blockCount--;
-    return reader.read(reuse, vin);
-  }
-
-  private void skipSync() throws IOException {
-    vin.readFixed(syncBuffer);
-    if (!Arrays.equals(syncBuffer, sync))
-      throw new IOException("Invalid sync!");
-  }
-
-  /** Move to the specified synchronization point, as returned by {@link
-   * DataFileWriter#sync()}. */
-  public synchronized void seek(long position) throws IOException {
-    in.seek(position);
-    blockCount = 0;
-  }
-
-  /** Move to the next synchronization point after a position. */
-  public synchronized void sync(long position) throws IOException {
-    if (in.tell()+DataFileConstants.SYNC_SIZE >= in.length()) {
-      in.seek(in.length());
+  /** Move to a specific, known synchronization point, one returned from {@link
+   * DataFileWriter#sync()} while writing.  If synchronization points were not
+   * saved while writing a file, use {#sync(long)} instead. */
+  public void seek(long position) throws IOException {
+    sin.seek(position);
+    blockRemaining = 0;
+    blockStart = position;
+  }
+
+  /** Move to the next synchronization point after a position. To process a
+   * range of file entires, call this with the starting position, then check
+   * {@link #pastSync(long)} with the end point before each call to {@link
+   * #next()}. */
+  public void sync(long position) throws IOException {
+    seek(position);
+    try {
+      vin.readFixed(syncBuffer);
+    } catch (EOFException e) {
+      blockStart = sin.tell();
       return;
     }
-    in.seek(position);
-    vin.readFixed(syncBuffer);
-    for (int i = 0; in.tell() < in.length(); i++) {
+    int i=0, b;
+    do {
       int j = 0;
-      for (; j < sync.length; j++) {
-        if (sync[j] != syncBuffer[(i+j)%sync.length])
+      for (; j < SYNC_SIZE; j++) {
+        if (sync[j] != syncBuffer[(i+j)%SYNC_SIZE])
           break;
       }
-      if (j == sync.length) {                     // position before sync
-        in.seek(in.tell() - DataFileConstants.SYNC_SIZE);
+      if (j == SYNC_SIZE) {                       // matched a complete sync
+        blockStart = position + i + SYNC_SIZE;
         return;
       }
-      syncBuffer[i%sync.length] = (byte)in.read();
-    }
+      b = in.read();
+      syncBuffer[i++%SYNC_SIZE] = (byte)b;
+    } while (b != -1);
+  }
+
+  @Override
+  void skipSync() throws IOException {            // note block start
+    super.skipSync();
+    blockStart = sin.tell();
   }
 
-  /** Close this reader. */
-  public synchronized void close() throws IOException {
-    in.close();
+  /** Return true if past the next synchronization point after a position. */ 
+  public boolean pastSync(long position) {
+    return blockStart >= Math.min(sin.length(), position+SYNC_SIZE);
   }
 
-  private class SeekableBufferedInput extends BufferedInputStream {
+  private static class SeekableBufferedInput extends BufferedInputStream {
     private long position;                        // end of buffer
     private long length;                          // file length
 
@@ -220,7 +130,7 @@
     }
 
     public long tell() { return position-(count-pos); }
-    public long length() throws IOException { return length; }
+    public long length() { return length; }
 
     public int read() throws IOException {        // optimized implementation
       if (pos >= count) return super.read();

Copied: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileStream.java (from r891421, hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java)
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileStream.java?p2=hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileStream.java&p1=hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java&r1=891421&r2=894889&rev=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileStream.java Thu Dec 31 17:01:58 2009
@@ -17,59 +17,59 @@
  */
 package org.apache.avro.file;
 
-import java.io.BufferedInputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
-import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.BinaryDecoder;
 
-/** Read files written by {@link DataFileWriter}.
+/** Streaming access to files written by {@link DataFileWriter}.  Use {@link
+ * DataFileReader} for file-based input.
  * @see DataFileWriter
  */
-public class DataFileReader<D> {
+public class DataFileStream<D> implements Iterator<D>, Iterable<D> {
 
   private Schema schema;
   private DatumReader<D> reader;
-  private SeekableBufferedInput in;
-  private Decoder vin;
+
+  final InputStream in;
+  final Decoder vin;
 
   Map<String,byte[]> meta = new HashMap<String,byte[]>();
 
-  private long count;                           // # entries in file
-  private long blockCount;                      // # entries in block
+  long blockRemaining;                          // # entries remaining in block
   byte[] sync = new byte[DataFileConstants.SYNC_SIZE];
-  private byte[] syncBuffer = new byte[DataFileConstants.SYNC_SIZE];
-
-  /** Construct a reader for a file. */
-  public DataFileReader(File file, DatumReader<D> reader) throws IOException {
-    this(new SeekableFileInput(file), reader);
-  }
+  byte[] syncBuffer = new byte[DataFileConstants.SYNC_SIZE];
 
-  /** Construct a reader for a file. */
-  public DataFileReader(SeekableInput sin, DatumReader<D> reader)
+  /** Construct a reader for an input stream.  For file-based input, use {@link
+   * DataFileReader}.  This performs no buffering, for good performance, be
+   * sure to pass in a {@link java.io.BufferedInputStream}. */
+  public DataFileStream(InputStream in, DatumReader<D> reader)
     throws IOException {
-    this.in = new SeekableBufferedInput(sin);
+    this.in = in;
+    this.vin = new BinaryDecoder(in);
 
-    byte[] magic = new byte[4];
-    in.read(magic);
+    byte[] magic = new byte[DataFileConstants.MAGIC.length];
+    try {
+      vin.readFixed(magic);                         // read magic
+    } catch (IOException e) {
+      throw new IOException("Not a data file.");
+    }
     if (!Arrays.equals(DataFileConstants.MAGIC, magic))
       throw new IOException("Not a data file.");
 
-    long length = in.length();
-    in.seek(length-4);
-    int footerSize=(in.read()<<24)+(in.read()<<16)+(in.read()<<8)+in.read();
-    in.seek(length-footerSize);
-    this.vin = new BinaryDecoder(in);
-    long l = vin.readMapStart();
+    long l = vin.readMapStart();                  // read meta data
     if (l > 0) {
       do {
         for (long i = 0; i < l; i++) {
@@ -81,9 +81,8 @@
         }
       } while ((l = vin.mapNext()) != 0);
     }
+    vin.readFixed(sync);                          // read sync
 
-    this.sync = getMeta(DataFileConstants.SYNC);
-    this.count = getMetaLong(DataFileConstants.COUNT);
     String codec = getMetaString(DataFileConstants.CODEC);
     if (codec != null && ! codec.equals(DataFileConstants.NULL_CODEC)) {
       throw new IOException("Unknown codec: " + codec);
@@ -92,22 +91,17 @@
     this.reader = reader;
 
     reader.setSchema(schema);
-
-    in.seek(DataFileConstants.MAGIC.length);         // seek to start
   }
   
   /** Return the schema used in this file. */
   public Schema getSchema() { return schema; }
   
-  /** Return the number of records in the file. */
-  public long getCount() { return count; }
-  
   /** Return the value of a metadata property. */
-  public synchronized byte[] getMeta(String key) {
+  public byte[] getMeta(String key) {
     return meta.get(key);
   }
   /** Return the value of a metadata property. */
-  public synchronized String getMetaString(String key) {
+  public String getMetaString(String key) {
     byte[] value = getMeta(key);
     if (value == null) {
       return null;
@@ -119,121 +113,65 @@
     }
   }
   /** Return the value of a metadata property. */
-  public synchronized long getMetaLong(String key) {
+  public long getMetaLong(String key) {
     return Long.parseLong(getMetaString(key));
   }
 
-  /** Return the next datum in the file. */
-  public synchronized D next(D reuse) throws IOException {
-    while (blockCount == 0) {                     // at start of block
+  /** Returns an iterator over entries in this file.  Note that this iterator
+   * is shared with other users of the file: it does not contain a separate
+   * pointer into the file. */
+  public Iterator<D> iterator() { return this; }
 
-      if (in.tell() == in.length())               // at eof
-        return null;
-
-      skipSync();                                 // skip a sync
-
-      blockCount = vin.readLong();                // read blockCount
-         
-      if (blockCount == DataFileConstants.FOOTER_BLOCK) { 
-        in.seek(vin.readLong()+in.tell());        // skip a footer
-        blockCount = 0;
-      }
+  /** True if more entries remain in this file. */
+  public boolean hasNext() {
+    try {
+      if (blockRemaining == 0)
+        blockRemaining = vin.readLong();          // read block count
+      return blockRemaining != 0;
+    } catch (EOFException e) {                    // at EOF
+      return false;
+    } catch (IOException e) {
+      throw new AvroRuntimeException(e);
     }
-    blockCount--;
-    return reader.read(reuse, vin);
   }
 
-  private void skipSync() throws IOException {
-    vin.readFixed(syncBuffer);
-    if (!Arrays.equals(syncBuffer, sync))
-      throw new IOException("Invalid sync!");
+  /** Read the next datum in the file.
+   * @throws NoSuchElementException if no more remain in the file.
+   */
+  public D next() {
+    try {
+      return next(null);
+    } catch (IOException e) {
+      throw new AvroRuntimeException(e);
+    }
   }
 
-  /** Move to the specified synchronization point, as returned by {@link
-   * DataFileWriter#sync()}. */
-  public synchronized void seek(long position) throws IOException {
-    in.seek(position);
-    blockCount = 0;
+  /** Read the next datum from the file.
+   * @param reuse an instance to reuse.
+   * @throws NoSuchElementException if no more remain in the file.
+   */
+  public D next(D reuse) throws IOException {
+    if (!hasNext())
+      throw new NoSuchElementException();
+    D result = reader.read(reuse, vin);
+    if (--blockRemaining == 0)
+      skipSync();
+    return result;
   }
 
-  /** Move to the next synchronization point after a position. */
-  public synchronized void sync(long position) throws IOException {
-    if (in.tell()+DataFileConstants.SYNC_SIZE >= in.length()) {
-      in.seek(in.length());
-      return;
-    }
-    in.seek(position);
+  void skipSync() throws IOException {
     vin.readFixed(syncBuffer);
-    for (int i = 0; in.tell() < in.length(); i++) {
-      int j = 0;
-      for (; j < sync.length; j++) {
-        if (sync[j] != syncBuffer[(i+j)%sync.length])
-          break;
-      }
-      if (j == sync.length) {                     // position before sync
-        in.seek(in.tell() - DataFileConstants.SYNC_SIZE);
-        return;
-      }
-      syncBuffer[i%sync.length] = (byte)in.read();
-    }
+    if (!Arrays.equals(syncBuffer, sync))
+      throw new IOException("Invalid sync!");
   }
 
+  /** Not supported. */
+  public void remove() { throw new UnsupportedOperationException(); }
+
   /** Close this reader. */
-  public synchronized void close() throws IOException {
+  public void close() throws IOException {
     in.close();
   }
 
-  private class SeekableBufferedInput extends BufferedInputStream {
-    private long position;                        // end of buffer
-    private long length;                          // file length
-
-    private class PositionFilter extends InputStream {
-      private SeekableInput in;
-      public PositionFilter(SeekableInput in) throws IOException {
-        this.in = in;
-      }
-      public int read() { throw new UnsupportedOperationException(); }
-      public int read(byte[] b, int off, int len) throws IOException {
-        int value = in.read(b, off, len);
-        if (value > 0) position += value;         // update on read
-        return value;
-      }
-    }
-
-    public SeekableBufferedInput(SeekableInput in) throws IOException {
-      super(null);
-      this.in = new PositionFilter(in);
-      this.length = in.length();
-    }
-
-    public void seek(long p) throws IOException {
-      if (p < 0) throw new IOException("Illegal seek: "+p);
-      long start = position - count;
-      if (p >= start && p < position) {            // in buffer
-        this.pos = (int)(p - start);
-      } else {                                     // not in buffer
-        this.pos = 0;
-        this.count = 0;
-        ((PositionFilter)in).in.seek(p);
-        this.position = p;
-      }
-    }
-
-    public long tell() { return position-(count-pos); }
-    public long length() throws IOException { return length; }
-
-    public int read() throws IOException {        // optimized implementation
-      if (pos >= count) return super.read();
-      return buf[pos++] & 0xff;
-    }
-
-    public long skip(long skip) throws IOException { // optimized implementation
-      if (skip > count-pos)
-        return super.skip(skip);
-      pos += (int)skip;
-      return skip;
-    }
-  }
-
 }
 

Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java Thu Dec 31 17:01:58 2009
@@ -36,7 +36,6 @@
 import java.security.NoSuchAlgorithmException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.List;
 
 import org.apache.avro.Schema;
 import org.apache.avro.AvroRuntimeException;
@@ -60,9 +59,8 @@
   private BufferedFileOutputStream out;
   private Encoder vout;
 
-  private Map<String,byte[]> meta = new HashMap<String,byte[]>();
+  private final Map<String,byte[]> meta = new HashMap<String,byte[]>();
 
-  private long count;                           // # entries in file
   private int blockCount;                       // # entries in current block
 
   private ByteArrayOutputStream buffer =
@@ -71,29 +69,55 @@
 
   private byte[] sync;                          // 16 random bytes
 
-  /** Construct a writer to a new file for data matching a schema. */
-  public DataFileWriter(Schema schema, File file,
-                        DatumWriter<D> dout) throws IOException {
-    this(schema, new FileOutputStream(file), dout);
-  }
-  /** Construct a writer to a new file for data matching a schema. */
-  public DataFileWriter(Schema schema, OutputStream outs,
-                        DatumWriter<D> dout) throws IOException {
+  private boolean isOpen;
+
+  /** Construct a writer, not yet open. */
+  public DataFileWriter(DatumWriter<D> dout) {
+    this.dout = dout;
+  }
+  
+  private void assertOpen() {
+    if (!isOpen) throw new AvroRuntimeException("not open");
+  }
+  private void assertNotOpen() {
+    if (isOpen) throw new AvroRuntimeException("already open");
+  }
+
+  /** Open a new file for data matching a schema. */
+  public DataFileWriter<D> create(Schema schema, File file) throws IOException {
+    return create(schema, new FileOutputStream(file));
+  }
+
+  /** Open a new file for data matching a schema. */
+  public DataFileWriter<D> create(Schema schema, OutputStream outs)
+    throws IOException {
+    assertNotOpen();
+
     this.schema = schema;
+    setMeta(DataFileConstants.SCHEMA, schema.toString());
     this.sync = generateSync();
 
-    setMeta(DataFileConstants.SYNC, sync);
-    setMeta(DataFileConstants.SCHEMA, schema.toString());
-    setMeta(DataFileConstants.CODEC, DataFileConstants.NULL_CODEC);
+    init(outs);
+
+    out.write(DataFileConstants.MAGIC);           // write magic
+
+    vout.writeMapStart();                         // write metadata
+    vout.setItemCount(meta.size());
+    for (Map.Entry<String,byte[]> entry : meta.entrySet()) {
+      vout.startItem();
+      vout.writeString(entry.getKey());
+      vout.writeBytes(entry.getValue());
+    }
+    vout.writeMapEnd();
 
-    init(outs, dout);
+    out.write(sync);                              // write initial sync
 
-    out.write(DataFileConstants.MAGIC);
+    return this;
   }
-  
-  /** Construct a writer appending to an existing file. */
-  public DataFileWriter(File file, DatumWriter<D> dout)
-    throws IOException {
+
+  /** Open a writer appending to an existing file. */
+  public DataFileWriter<D> appendTo(File file) throws IOException {
+    assertNotOpen();
     if (!file.exists())
       throw new FileNotFoundException("Not found: "+file);
     RandomAccessFile raf = new RandomAccessFile(file, "rw");
@@ -103,21 +127,21 @@
                             new GenericDatumReader<D>());
     this.schema = reader.getSchema();
     this.sync = reader.sync;
-    this.count = reader.getCount();
     this.meta.putAll(reader.meta);
 
     FileChannel channel = raf.getChannel();       // seek to end
     channel.position(channel.size());
 
-    init(new FileOutputStream(fd), dout);
+    init(new FileOutputStream(fd));
+
+    return this;
   }
-  
-  private void init(OutputStream outs, DatumWriter<D> dout)
-    throws IOException {
+
+  private void init(OutputStream outs) throws IOException {
     this.out = new BufferedFileOutputStream(outs);
     this.vout = new BinaryEncoder(out);
-    this.dout = dout;
     dout.setSchema(schema);
+    this.isOpen = true;
   }
 
   private static byte[] generateSync() {
@@ -132,92 +156,63 @@
   }
 
   /** Set a metadata property. */
-  public synchronized void setMeta(String key, byte[] value) {
-      meta.put(key, value);
-    }
+  public DataFileWriter<D> setMeta(String key, byte[] value) {
+    assertNotOpen();
+    meta.put(key, value);
+    return this;
+  }
   /** Set a metadata property. */
-  public synchronized void setMeta(String key, String value) {
-      try {
-        setMeta(key, value.getBytes("UTF-8"));
-      } catch (UnsupportedEncodingException e) {
-        throw new RuntimeException(e);
-      }
+  public DataFileWriter<D> setMeta(String key, String value) {
+    try {
+      return setMeta(key, value.getBytes("UTF-8"));
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
     }
+  }
   /** Set a metadata property. */
-  public synchronized void setMeta(String key, long value) {
-      setMeta(key, Long.toString(value));
-    }
-
-  /** If the schema for this file is a union, add a branch to it. */
-  public synchronized void addSchema(Schema branch) {
-    if (schema.getType() != Schema.Type.UNION)
-      throw new AvroRuntimeException("Not a union schema: "+schema);
-    List<Schema> types = schema.getTypes();
-    types.add(branch);
-    this.schema = Schema.createUnion(types);
-    this.dout.setSchema(schema);
-    setMeta(DataFileConstants.SCHEMA, schema.toString());
+  public DataFileWriter<D> setMeta(String key, long value) {
+    return setMeta(key, Long.toString(value));
   }
 
   /** Append a datum to the file. */
-  public synchronized void append(D datum) throws IOException {
-      dout.write(datum, bufOut);
-      blockCount++;
-      count++;
-      if (buffer.size() >= DataFileConstants.SYNC_INTERVAL)
-        writeBlock();
-    }
+  public void append(D datum) throws IOException {
+    assertOpen();
+    dout.write(datum, bufOut);
+    blockCount++;
+    if (buffer.size() >= DataFileConstants.SYNC_INTERVAL)
+      writeBlock();
+  }
 
   private void writeBlock() throws IOException {
     if (blockCount > 0) {
-      out.write(sync);
       vout.writeLong(blockCount);
       buffer.writeTo(out);
       buffer.reset();
       blockCount = 0;
+      out.write(sync);
     }
   }
 
   /** Return the current position as a value that may be passed to {@link
    * DataFileReader#seek(long)}.  Forces the end of the current block,
    * emitting a synchronization marker. */
-  public synchronized long sync() throws IOException {
-      writeBlock();
-      return out.tell();
-    }
+  public long sync() throws IOException {
+    assertOpen();
+    writeBlock();
+    return out.tell();
+  }
 
-  /** Flush the current state of the file, including metadata. */
-  public synchronized void flush() throws IOException {
-      writeFooter();
-      out.flush();
-    }
+  /** Flush the current state of the file. */
+  public void flush() throws IOException {
+    sync();
+    out.flush();
+  }
 
   /** Close the file. */
-  public synchronized void close() throws IOException {
-      flush();
-      out.close();
-    }
-
-  private void writeFooter() throws IOException {
-    writeBlock();                               // flush any data
-    setMeta(DataFileConstants.COUNT, count);    // update count
-    bufOut.writeMapStart();              // write meta entries
-    bufOut.setItemCount(meta.size());
-    for (Map.Entry<String,byte[]> entry : meta.entrySet()) {
-      bufOut.startItem();
-      bufOut.writeString(entry.getKey());
-      bufOut.writeBytes(entry.getValue());
-    }
-    bufOut.writeMapEnd();
-    
-    int size = buffer.size()+4;
-    out.write(sync);
-    vout.writeLong(DataFileConstants.FOOTER_BLOCK);                 // tag the block
-    vout.writeLong(size);
-    buffer.writeTo(out);
-    buffer.reset();
-    out.write((byte)(size >>> 24)); out.write((byte)(size >>> 16));
-    out.write((byte)(size >>> 8));  out.write((byte)(size >>> 0));
+  public void close() throws IOException {
+    flush();
+    out.close();
+    isOpen = false;
   }
 
   private class BufferedFileOutputStream extends BufferedOutputStream {

Modified: hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileReadTool.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileReadTool.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileReadTool.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileReadTool.java Thu Dec 31 17:01:58 2009
@@ -61,8 +61,7 @@
       Schema schema = fileReader.getSchema();
       DatumWriter<Object> writer = new GenericDatumWriter<Object>(schema);
       Encoder encoder = new JsonEncoder(schema, (JsonGenerator)null);
-      Object datum;
-      while (null != (datum = fileReader.next(null))) {
+      for (Object datum : fileReader) {
         // init() recreates the internal Jackson JsonGenerator
         encoder.init(out);
         writer.write(datum, encoder);

Modified: hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileWriteTool.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileWriteTool.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileWriteTool.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileWriteTool.java Thu Dec 31 17:01:58 2009
@@ -59,8 +59,8 @@
     try {
       DataInputStream din = new DataInputStream(input);
       DataFileWriter<Object> writer =
-        new DataFileWriter<Object>(schema, out,
-                                   new GenericDatumWriter<Object>());
+        new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+        .create(schema, out);
       Decoder decoder = new JsonDecoder(schema, din);
       Object datum;
       while (true) {

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/RandomData.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/RandomData.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/RandomData.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/RandomData.java Thu Dec 31 17:01:58 2009
@@ -18,7 +18,6 @@
 package org.apache.avro;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -129,9 +128,8 @@
     }
     Schema sch = Schema.parse(new File(args[0]));
     DataFileWriter<Object> writer =
-      new DataFileWriter<Object>(sch, 
-          new FileOutputStream(new File(args[1]),false),
-          new GenericDatumWriter<Object>());
+      new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+      .create(sch, new File(args[1]));
     try {
       for (Object datum : new RandomData(sch, Integer.parseInt(args[2]))) {
         writer.append(datum);

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java Thu Dec 31 17:01:58 2009
@@ -28,6 +28,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+import java.util.Random;
 import java.io.File;
 import java.io.IOException;
 
@@ -52,11 +53,14 @@
   @Test
   public void testGenericWrite() throws IOException {
     DataFileWriter<Object> writer =
-      new DataFileWriter<Object>(SCHEMA, FILE,
-                                 new GenericDatumWriter<Object>());
+      new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+      .create(SCHEMA, FILE);
     try {
+      int count = 0;
       for (Object datum : new RandomData(SCHEMA, COUNT, SEED)) {
         writer.append(datum);
+        if (++count%(COUNT/3) == 0)
+          writer.sync();                          // force some syncs mid-file
       }
     } finally {
       writer.close();
@@ -68,7 +72,6 @@
     DataFileReader<Object> reader =
       new DataFileReader<Object>(FILE, new GenericDatumReader<Object>());
     try {
-      assertEquals(COUNT, reader.getCount());
       Object datum = null;
       if (VALIDATE) {
         for (Object expected : new RandomData(SCHEMA, COUNT, SEED)) {
@@ -86,10 +89,38 @@
   }
 
   @Test
+  public void testSplits() throws IOException {
+    DataFileReader<Object> reader =
+      new DataFileReader<Object>(FILE, new GenericDatumReader<Object>());
+    Random rand = new Random(SEED);
+    try {
+      int splits = 10;                            // number of splits
+      int length = (int)FILE.length();            // length of file
+      int end = length;                           // end of split
+      int remaining = end;                        // bytes remaining
+      int count = 0;                              // count of entries
+      while (remaining > 0) {
+        int start = Math.max(0, end - rand.nextInt(2*length/splits));
+        reader.sync(start);                       // count entries in split
+        while (!reader.pastSync(end)) {
+          reader.next();
+          count++;
+        }
+        remaining -= end-start;
+        end = start;
+      }
+      assertEquals(COUNT, count);
+    } finally {
+      reader.close();
+    }
+  }
+
+  @Test
   public void testGenericAppend() throws IOException {
     long start = FILE.length();
     DataFileWriter<Object> writer =
-      new DataFileWriter<Object>(FILE, new GenericDatumWriter<Object>());
+      new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+      .appendTo(FILE);
     try {
       for (Object datum : new RandomData(SCHEMA, COUNT, SEED+1)) {
         writer.append(datum);
@@ -100,7 +131,6 @@
     DataFileReader<Object> reader =
       new DataFileReader<Object>(FILE, new GenericDatumReader<Object>());
     try {
-      assertEquals(COUNT*2, reader.getCount());
       reader.seek(start);
       Object datum = null;
       if (VALIDATE) {
@@ -118,18 +148,12 @@
     }
   }
 
-
-
-  protected void readFile(File f, 
-      DatumReader<Object> datumReader, boolean reuse)
+  protected void readFile(File f, DatumReader<Object> datumReader)
     throws IOException {
     System.out.println("Reading "+ f.getName());
     DataFileReader<Object> reader =
       new DataFileReader<Object>(new SeekableFileInput(f), datumReader);
-    Object datum = null;
-    long count = reader.getMetaLong("count");
-    for (int i = 0; i < count; i++) {
-      datum = reader.next(reuse ? datum : null);
+    for (Object datum : reader) {
       assertNotNull(datum);
     }
   }
@@ -140,10 +164,10 @@
     if (args.length > 1)
       projection = Schema.parse(new File(args[1]));
     TestDataFile tester = new TestDataFile();
-    tester.readFile(input, new GenericDatumReader<Object>(null, projection), false);
+    tester.readFile(input, new GenericDatumReader<Object>(null, projection));
     long start = System.currentTimeMillis();
     for (int i = 0; i < 4; i++)
-      tester.readFile(input, new GenericDatumReader<Object>(null, projection), false);
+      tester.readFile(input, new GenericDatumReader<Object>(null, projection));
     System.out.println("Time: "+(System.currentTimeMillis()-start));
   }
 
@@ -172,7 +196,7 @@
     private void readFiles(DatumReader<Object> datumReader) throws IOException {
       TestDataFile test = new TestDataFile();
       for (File f : DATAFILE_DIR.listFiles())
-        test.readFile(f, datumReader, true);
+        test.readFile(f, datumReader);
     }
   }
 }

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java Thu Dec 31 17:01:58 2009
@@ -52,8 +52,9 @@
         reflectData.getSchema(FooRecord.class),
         reflectData.getSchema(BarRecord.class) });
     Schema union = Schema.createUnion(schemas);
-    DataFileWriter<Object> writer = new DataFileWriter<Object>(union, fos,
-        new ReflectDatumWriter(union));
+    DataFileWriter<Object> writer =
+      new DataFileWriter<Object>(new ReflectDatumWriter(union))
+      .create(union, fos);
 
     // test writing to a file
     CheckList check = new CheckList();
@@ -66,59 +67,10 @@
     ReflectDatumReader din = new ReflectDatumReader();
     SeekableFileInput sin = new SeekableFileInput(FILE);
     DataFileReader<Object> reader = new DataFileReader<Object>(sin, din);
-    Object datum = null;
-    long count = reader.getMetaLong("count");
-    for (int i = 0; i < count; i++) {
-      datum = reader.next(datum);
-      check.assertEquals(datum, i);
-    }
-    reader.close();
-  }
-
-  /*
-   * Test that using multiple schemas in a file works doing a union for new
-   * types as they come.
-   */
-  @Test
-  public void testMultiReflectWithUntionAfterWriting() throws IOException {
-    FileOutputStream fos = new FileOutputStream(FILE);
-
-    ReflectData reflectData = ReflectData.get();
-    List<Schema> schemas = new ArrayList<Schema>();
-    schemas.add(reflectData.getSchema(FooRecord.class));
-    Schema union = Schema.createUnion(schemas);
-    DataFileWriter<Object> writer = new DataFileWriter<Object>(union, fos,
-        new ReflectDatumWriter(union));
-
-    CheckList check = new CheckList();
-    // write known type
-    write(writer, new FooRecord(10), check);
-    write(writer, new FooRecord(15), check);
-
-    // we have a new type, add it to the file
-    writer.addSchema(reflectData.getSchema(BarRecord.class));
-
-    // test writing those new types to a file
-    write(writer, new BarRecord("One beer please"), check);
-    write(writer, new BarRecord("Two beers please"), check);
-
-    // does foo record still work?
-    write(writer, new FooRecord(20), check);
-
-    // get one more bar in, just for laughs
-    write(writer, new BarRecord("Many beers please"), check);
-
-    writer.close();
-
-    ReflectDatumReader din = new ReflectDatumReader();
-    SeekableFileInput sin = new SeekableFileInput(FILE);
-    DataFileReader<Object> reader = new DataFileReader<Object>(sin, din);
-    Object datum = null;
-    long count = reader.getMetaLong("count");
-    for (int i = 0; i < count; i++) {
-      datum = reader.next(datum);
-      check.assertEquals(datum, i);
-    }
+    int count = 0;
+    for (Object datum : reader)
+      check.assertEquals(datum, count++);
+    Assert.assertEquals(count, check.size());
     reader.close();
   }
 
@@ -131,8 +83,9 @@
 
     ReflectData reflectData = ReflectData.AllowNull.get();
     Schema schema = reflectData.getSchema(BarRecord.class);
-    DataFileWriter<Object> writer = new DataFileWriter<Object>(schema, fos,
-        new ReflectDatumWriter(BarRecord.class, reflectData));
+    DataFileWriter<Object> writer = new DataFileWriter<Object>
+      (new ReflectDatumWriter(BarRecord.class, reflectData))
+      .create(schema, fos);
 
     // test writing to a file
     CheckList check = new CheckList();
@@ -145,12 +98,10 @@
     ReflectDatumReader din = new ReflectDatumReader();
     SeekableFileInput sin = new SeekableFileInput(FILE);
     DataFileReader<Object> reader = new DataFileReader<Object>(sin, din);
-    Object datum = null;
-    long count = reader.getMetaLong("count");
-    for (int i = 0; i < count; i++) {
-      datum = reader.next(datum);
-      check.assertEquals(datum, i);
-    }
+    int count = 0;
+    for (Object datum : reader)
+      check.assertEquals(datum, count++);
+    Assert.assertEquals(count, check.size());
     reader.close();
   }
 
@@ -162,8 +113,9 @@
     FileOutputStream fos = new FileOutputStream(FILE);
 
     Schema schema = ReflectData.get().getSchema(BazRecord.class);
-    DataFileWriter<Object> writer = new DataFileWriter<Object>(schema, fos,
-        new ReflectDatumWriter(schema));
+    DataFileWriter<Object> writer =
+      new DataFileWriter<Object>(new ReflectDatumWriter(schema))
+      .create(schema, fos);
 
     // test writing to a file
     CheckList check = new CheckList();
@@ -174,12 +126,10 @@
     ReflectDatumReader din = new ReflectDatumReader();
     SeekableFileInput sin = new SeekableFileInput(FILE);
     DataFileReader<Object> reader = new DataFileReader<Object>(sin, din);
-    Object datum = null;
-    long count = reader.getMetaLong("count");
-    for (int i = 0; i < count; i++) {
-      datum = reader.next(datum);
-      check.assertEquals(datum, i);
-    }
+    int count = 0;
+    for (Object datum : reader)
+      check.assertEquals(datum, count++);
+    Assert.assertEquals(count, check.size());
     reader.close();
   }
 

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestDataFileTools.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestDataFileTools.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestDataFileTools.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestDataFileTools.java Thu Dec 31 17:01:58 2009
@@ -49,8 +49,9 @@
     sampleFile = AvroTestUtil.tempFile(TestDataFileTools.class + ".avro");
     schema = Schema.create(Type.INT);
     
-    DataFileWriter<Object> writer = new DataFileWriter<Object>(
-        schema, sampleFile, new GenericDatumWriter<Object>(schema));
+    DataFileWriter<Object> writer
+      = new DataFileWriter<Object>(new GenericDatumWriter<Object>(schema))
+      .create(schema, sampleFile);
     StringBuilder builder = new StringBuilder();
 
     for (int i = 0; i < COUNT; ++i) {
@@ -106,15 +107,13 @@
     // Read it back, and make sure it's valid.
     GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
     DataFileReader<Object> fileReader = new DataFileReader<Object>(outFile,reader);
-    Object datum;
     int i = 0;
-    while (null != (datum = fileReader.next(null))) {
+    for (Object datum : fileReader) {
       assertEquals(i, datum);
       i++;
     }
     assertEquals(COUNT, i);
     assertEquals(schema, fileReader.getSchema());
-    assertEquals(COUNT, fileReader.getCount());
   }
   
   @Test
@@ -148,7 +147,7 @@
     DataFileReader<Object> fileReader = 
       new DataFileReader<Object>(outFile,reader);
     int i = 0;
-    while (null != fileReader.next(null)) {
+    for (Object datum : fileReader) {
       i++;
     }
     return i;



Mime
View raw message