avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r833621 - in /hadoop/avro/trunk: ./ src/java/org/apache/avro/file/ src/test/java/org/apache/avro/
Date Sat, 07 Nov 2009 00:56:12 GMT
Author: cutting
Date: Sat Nov  7 00:56:12 2009
New Revision: 833621

URL: http://svn.apache.org/viewvc?rev=833621&view=rev
Log:
AVRO-158.  Permit appending to a data file from Java.

Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
    hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
    hadoop/avro/trunk/src/java/org/apache/avro/file/SeekableFileInput.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=833621&r1=833620&r2=833621&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Sat Nov  7 00:56:12 2009
@@ -8,6 +8,8 @@
 
     AVRO-151. Validating Avro schema parser for C (massie)
 
+    AVRO-158. Permit appending to a data file from Java.  (cutting)
+
   IMPROVEMENTS
 
     AVRO-180. Enhance code generator script and unit tests. (sbanacho)

Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java?rev=833621&r1=833620&r2=833621&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java Sat Nov  7 00:56:12
2009
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
+import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -41,14 +42,19 @@
   private SeekableBufferedInput in;
   private Decoder vin;
 
-  private Map<String,byte[]> meta = new HashMap<String,byte[]>();
+  Map<String,byte[]> meta = new HashMap<String,byte[]>();
 
   private long count;                           // # entries in file
   private long blockCount;                      // # entries in block
-  private byte[] sync = new byte[DataFileConstants.SYNC_SIZE];
+  byte[] sync = new byte[DataFileConstants.SYNC_SIZE];
   private byte[] syncBuffer = new byte[DataFileConstants.SYNC_SIZE];
 
   /** Construct a reader for a file. */
+  public DataFileReader(File file, DatumReader<D> reader) throws IOException {
+    this(new SeekableFileInput(file), reader);
+  }
+
+  /** Construct a reader for a file. */
   public DataFileReader(SeekableInput sin, DatumReader<D> reader)
     throws IOException {
     this.in = new SeekableBufferedInput(sin);
@@ -90,14 +96,11 @@
     in.seek(DataFileConstants.MAGIC.length);         // seek to start
   }
   
-
-  /**
-   * Return the number of records in the file, according
-   * to its metadata.
-   */
-  public long getCount() {
-    return count;
-  }
+  /** Return the schema used in this file. */
+  public Schema getSchema() { return schema; }
+  
+  /** Return the number of records in the file. */
+  public long getCount() { return count; }
   
   /** Return the value of a metadata property. */
   public synchronized byte[] getMeta(String key) {

Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java?rev=833621&r1=833620&r2=833621&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java Sat Nov  7 00:56:12
2009
@@ -23,8 +23,15 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileOutputStream;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
 import java.rmi.server.UID;
 import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.List;
@@ -34,6 +41,7 @@
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.generic.GenericDatumReader;
 
 /** Stores in a file a sequence of data conforming to a schema.  The schema is
  * stored in the file with the data.  Each datum in a file is of the same
@@ -60,34 +68,67 @@
   private Encoder bufOut = new BinaryEncoder(buffer);
 
   private byte[] sync;                          // 16 random bytes
-  {
-    try {                                       // initialize sync
-      MessageDigest digester = MessageDigest.getInstance("MD5");
-      long time = System.currentTimeMillis();
-      digester.update((new UID()+"@"+time).getBytes());
-      sync = digester.digest();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
 
-  /** Construct a writer to a file for data matching a schema. */
+  /** Construct a writer to a new file for data matching a schema. */
+  public DataFileWriter(Schema schema, File file,
+                        DatumWriter<D> dout) throws IOException {
+    this(schema, new FileOutputStream(file), dout);
+  }
+  /** Construct a writer to a new file for data matching a schema. */
   public DataFileWriter(Schema schema, OutputStream outs,
                         DatumWriter<D> dout) throws IOException {
     this.schema = schema;
-    this.out = new BufferedFileOutputStream(outs);
-    this.vout = new BinaryEncoder(out);
-    this.dout = dout;
-    
-    dout.setSchema(schema);
+    this.sync = generateSync();
 
     setMeta(DataFileConstants.SYNC, sync);
     setMeta(DataFileConstants.SCHEMA, schema.toString());
     setMeta(DataFileConstants.CODEC, DataFileConstants.NULL_CODEC);
-    
+
+    init(outs, dout);
+
     out.write(DataFileConstants.MAGIC);
   }
   
+  /** Construct a writer appending to an existing file. */
+  public DataFileWriter(File file, DatumWriter<D> dout)
+    throws IOException {
+    if (!file.exists())
+      throw new FileNotFoundException("Not found: "+file);
+    RandomAccessFile raf = new RandomAccessFile(file, "rw");
+    FileDescriptor fd = raf.getFD();
+    DataFileReader<D> reader =
+      new DataFileReader<D>(new SeekableFileInput(fd),
+                            new GenericDatumReader<D>());
+    this.schema = reader.getSchema();
+    this.sync = reader.sync;
+    this.count = reader.getCount();
+    this.meta.putAll(reader.meta);
+
+    FileChannel channel = raf.getChannel();       // seek to end
+    channel.position(channel.size());
+
+    init(new FileOutputStream(fd), dout);
+  }
+  
+  private void init(OutputStream outs, DatumWriter<D> dout)
+    throws IOException {
+    this.out = new BufferedFileOutputStream(outs);
+    this.vout = new BinaryEncoder(out);
+    this.dout = dout;
+    dout.setSchema(schema);
+  }
+
+  private static byte[] generateSync() {
+    try {
+      MessageDigest digester = MessageDigest.getInstance("MD5");
+      long time = System.currentTimeMillis();
+      digester.update((new UID()+"@"+time).getBytes());
+      return digester.digest();
+    } catch (NoSuchAlgorithmException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   /** Set a metadata property. */
   public synchronized void setMeta(String key, byte[] value) {
       meta.put(key, value);

Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/SeekableFileInput.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/SeekableFileInput.java?rev=833621&r1=833620&r2=833621&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/SeekableFileInput.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/SeekableFileInput.java Sat Nov  7 00:56:12
2009
@@ -19,6 +19,7 @@
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileDescriptor;
 import java.io.IOException;
 
 /** A {@link FileInputStream} that implements {@link SeekableInput}. */
@@ -26,6 +27,7 @@
   extends FileInputStream implements SeekableInput {
 
   public SeekableFileInput(File file) throws IOException { super(file); }
+  public SeekableFileInput(FileDescriptor fd) throws IOException { super(fd); }
 
   public void seek(long p) throws IOException { getChannel().position(p); }
   public long tell() throws IOException { return getChannel().position(); }

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java?rev=833621&r1=833620&r2=833621&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java Sat Nov  7 00:56:12
2009
@@ -30,7 +30,6 @@
 import static org.junit.Assert.assertNotNull;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 
 public class TestDataFile {
@@ -54,8 +53,7 @@
   @Test
   public void testGenericWrite() throws IOException {
     DataFileWriter<Object> writer =
-      new DataFileWriter<Object>(SCHEMA,
-                                 new FileOutputStream(FILE),
+      new DataFileWriter<Object>(SCHEMA, FILE,
                                  new GenericDatumWriter<Object>());
     try {
       for (Object datum : new RandomData(SCHEMA, COUNT, SEED)) {
@@ -69,8 +67,7 @@
   @Test
   public void testGenericRead() throws IOException {
     DataFileReader<Object> reader =
-      new DataFileReader<Object>(new SeekableFileInput(FILE),
-                                 new GenericDatumReader<Object>());
+      new DataFileReader<Object>(FILE, new GenericDatumReader<Object>());
     try {
       assertEquals(COUNT, reader.getCount());
       Object datum = null;
@@ -89,6 +86,41 @@
     }
   }
 
+  @Test
+  public void testGenericAppend() throws IOException {
+    long start = FILE.length();
+    DataFileWriter<Object> writer =
+      new DataFileWriter<Object>(FILE, new GenericDatumWriter<Object>());
+    try {
+      for (Object datum : new RandomData(SCHEMA, COUNT, SEED+1)) {
+        writer.append(datum);
+      }
+    } finally {
+      writer.close();
+    }
+    DataFileReader<Object> reader =
+      new DataFileReader<Object>(FILE, new GenericDatumReader<Object>());
+    try {
+      assertEquals(COUNT*2, reader.getCount());
+      reader.seek(start);
+      Object datum = null;
+      if (VALIDATE) {
+        for (Object expected : new RandomData(SCHEMA, COUNT, SEED+1)) {
+          datum = reader.next(datum);
+          assertEquals(expected, datum);
+        }
+      } else {
+        for (int i = 0; i < COUNT; i++) {
+          datum = reader.next(datum);
+        }
+      }
+    } finally {
+      reader.close();
+    }
+  }
+
+
+
   protected void readFile(File f, 
       DatumReader<Object> datumReader, boolean reuse)
     throws IOException {



Mime
View raw message