avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1539765 - in /avro/trunk: ./ lang/java/avro/src/main/java/org/apache/avro/file/ lang/java/avro/src/test/java/org/apache/avro/
Date Thu, 07 Nov 2013 19:09:09 GMT
Author: cutting
Date: Thu Nov  7 19:09:09 2013
New Revision: 1539765

URL: http://svn.apache.org/r1539765
Log:
AVRO-1388. Java: Add fsync support to DataFileWriter.  Contributed by Hari Shreedharan.

Added:
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/Syncable.java   (with props)
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SyncableFileOutputStream.java
  (with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
    avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1539765&r1=1539764&r2=1539765&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Nov  7 19:09:09 2013
@@ -6,6 +6,9 @@ Trunk (not yet released)
 
     AVRO-975. C#: Add RPC support. (Mark Lamley via cutting)
 
+    AVRO-1388. Java: Add fsync support to DataFileWriter.
+    (Hari Shreedharan via cutting)
+
   IMPROVEMENTS
 
     AVRO-1355. Java: Reject schemas with duplicate field

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java?rev=1539765&r1=1539764&r2=1539765&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java Thu Nov
 7 19:09:09 2013
@@ -21,7 +21,6 @@ import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.FilterOutputStream;
 import java.io.Flushable;
 import java.io.IOException;
@@ -54,6 +53,8 @@ public class DataFileWriter<D> implement
   private Schema schema;
   private DatumWriter<D> dout;
 
+  private OutputStream underlyingStream;
+
   private BufferedFileOutputStream out;
   private BinaryEncoder vout;
 
@@ -125,7 +126,7 @@ public class DataFileWriter<D> implement
 
   /** Open a new file for data matching a schema. */
   public DataFileWriter<D> create(Schema schema, File file) throws IOException {
-    return create(schema, new FileOutputStream(file));
+    return create(schema, new SyncableFileOutputStream(file));
   }
 
   /** Open a new file for data matching a schema. */
@@ -178,7 +179,7 @@ public class DataFileWriter<D> implement
   /** Open a writer appending to an existing file. */
   public DataFileWriter<D> appendTo(File file) throws IOException {
     return appendTo(new SeekableFileInput(file),
-                    new FileOutputStream(file, true));
+                    new SyncableFileOutputStream(file, true));
   }
 
   /** Open a writer appending to an existing file.
@@ -208,6 +209,7 @@ public class DataFileWriter<D> implement
   }
 
   private void init(OutputStream outs) throws IOException {
+    this.underlyingStream = outs;
     this.out = new BufferedFileOutputStream(outs);
     EncoderFactory efactory = new EncoderFactory();
     this.vout = efactory.binaryEncoder(out, null);
@@ -409,6 +411,21 @@ public class DataFileWriter<D> implement
     vout.flush();
   }
 
+  /**
+   * If this writer was instantiated using a File or using an
+   * {@linkplain Syncable} instance, this method flushes all buffers for this
+   * writer to disk. In other cases, this method behaves exactly
+   * like {@linkplain #flush()}.
+   *
+   * @throws IOException
+   */
+  public void fSync() throws IOException {
+    flush();
+    if (underlyingStream instanceof Syncable) {
+      ((Syncable) underlyingStream).sync();
+    }
+  }
+
   /** Flush and close the file. */
   @Override
   public void close() throws IOException {

Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/Syncable.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/Syncable.java?rev=1539765&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/Syncable.java (added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/Syncable.java Thu Nov  7
19:09:09 2013
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.IOException;
+
+public interface Syncable {
+
+  /**
+   * Sync the file to disk. On supported platforms, this method behaves like
+   * POSIX <code>fsync</code> and syncs all underlying OS buffers for this
+   * file descriptor to disk. On these platforms, if this method returns,
+   * the data written to this instance is guaranteed to be persisted on disk.
+   *
+   * @throws IOException - if an error occurred while attempting to sync the
+   *                     data to disk.
+   */
+  void sync() throws IOException;
+}

Propchange: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/Syncable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SyncableFileOutputStream.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SyncableFileOutputStream.java?rev=1539765&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SyncableFileOutputStream.java
(added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SyncableFileOutputStream.java
Thu Nov  7 19:09:09 2013
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.avro.file;
+
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/**
+ * An implementation of {@linkplain Syncable} which writes to a file.
+ * An instance of this class can be used with {@linkplain DataFileWriter} to
+ * guarantee that Avro Container Files are persisted to disk on supported
+ * platforms using the
+ * {@linkplain org.apache.avro.file.DataFileWriter#fSync()} method.
+ *
+ * @see FileOutputStream
+ */
+public class SyncableFileOutputStream
+  extends FileOutputStream implements Syncable {
+
+  /**
+   * Creates an instance of {@linkplain SyncableFileOutputStream} with the
+   * given name.
+   *
+   * @param name - the full file name.
+   * @throws FileNotFoundException - if the file cannot be created or opened.
+   */
+  public SyncableFileOutputStream(String name) throws FileNotFoundException {
+    super(name);
+  }
+
+  /**
+   * Creates an instance of {@linkplain SyncableFileOutputStream} using the
+   * given {@linkplain File} instance.
+   *
+   * @param file - The file to use to create the output stream.
+   *
+   * @throws FileNotFoundException - if the file cannot be created or opened.
+   */
+  public SyncableFileOutputStream(File file)
+    throws FileNotFoundException {
+    super(file);
+  }
+
+  /**
+   * Creates an instance of {@linkplain SyncableFileOutputStream} with the
+   * given name and optionally append to the file if it already exists.
+   *
+   * @param name - the full file name.
+   * @param append - true if the file is to be appended to
+   *
+   * @throws FileNotFoundException - if the file cannot be created or opened.
+   */
+  public SyncableFileOutputStream(String name, boolean append)
+    throws FileNotFoundException {
+    super(name, append);
+  }
+
+  /**
+   * Creates an instance of {@linkplain SyncableFileOutputStream}
+   * that writes to the file represented by the given {@linkplain File}
+   * instance and optionally append to the file if it already exists.
+   *
+   * @param file - the file instance to use to create the stream.
+   * @param append - true if the file is to be appended to
+   *
+   * @throws FileNotFoundException - if the file cannot be created or opened.
+   */
+  public SyncableFileOutputStream(File file, boolean append)
+    throws FileNotFoundException {
+    super(file, append);
+  }
+
+  /**
+   * Creates an instance of {@linkplain SyncableFileOutputStream}
+   * using the given {@linkplain FileDescriptor} instance.
+   */
+  public SyncableFileOutputStream(FileDescriptor fdObj) {
+    super(fdObj);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void sync() throws IOException {
+    getFD().sync();
+  }
+}

Propchange: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/SyncableFileOutputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java?rev=1539765&r1=1539764&r2=1539765&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java (original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java Thu Nov  7 19:09:09
2013
@@ -35,6 +35,7 @@ import org.apache.avro.file.DataFileRead
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.file.SeekableFileInput;
+import org.apache.avro.file.Syncable;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -93,6 +94,8 @@ public class TestDataFile {
     testSyncDiscovery();
     testGenericAppend();
     testReadWithHeader();
+    testFSync(false);
+    testFSync(true);
   }
 
   public void testGenericWrite() throws IOException {
@@ -315,6 +318,37 @@ public class TestDataFile {
       out.flushCount >= flushCounter);
   }
 
+  private void testFSync(boolean useFile) throws IOException {
+    DataFileWriter<Object> writer =
+      new DataFileWriter<Object>(new GenericDatumWriter<Object>());
+    writer.setFlushOnEveryBlock(false);
+    TestingByteArrayOutputStream out = new TestingByteArrayOutputStream();
+    if (useFile) {
+      File f = makeFile();
+      SeekableFileInput in = new SeekableFileInput(f);
+      writer.appendTo(in, out);
+    } else {
+      writer.create(SCHEMA, out);
+    }
+    int currentCount = 0;
+    int syncCounter = 0;
+    try {
+      for (Object datum : new RandomData(SCHEMA, COUNT, SEED+1)) {
+        currentCount++;
+        writer.append(datum);
+        if (currentCount % 10 == 0) {
+          writer.fSync();
+          syncCounter++;
+        }
+      }
+    } finally {
+      writer.close();
+    }
+    System.out.println("Total number of syncs: " + out.syncCount);
+    Assert.assertEquals(syncCounter, out.syncCount);
+  }
+
+
   static void readFile(File f, DatumReader<? extends Object> datumReader)
     throws IOException {
     FileReader<? extends Object> reader = DataFileReader.openReader(f, datumReader);
@@ -335,13 +369,20 @@ public class TestDataFile {
     System.out.println("Time: "+(System.currentTimeMillis()-start));
   }
 
-  private class TestingByteArrayOutputStream extends ByteArrayOutputStream {
+  private class TestingByteArrayOutputStream extends ByteArrayOutputStream
+    implements Syncable {
     private int flushCount = 0;
+    private int syncCount = 0;
 
     @Override
     public void flush() throws IOException {
       super.flush();
       flushCount++;
     }
+
+    @Override
+    public void sync() throws IOException {
+      syncCount++;
+    }
   }
 }



Mime
View raw message