flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] flink git commit: [FLINK-5788] [docs] Improve documentation of FileSystem and specify the data persistence contract.
Date Tue, 14 Feb 2017 14:48:51 GMT
Repository: flink
Updated Branches:
  refs/heads/master 663c1e3f7 -> f7af3b016


[FLINK-5788] [docs] Improve documentation of FileSystem and specify the data persistence contract.

This closes #3301


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7af3b01
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7af3b01
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7af3b01

Branch: refs/heads/master
Commit: f7af3b01681592787db16a555b55d6b11d35f869
Parents: af81beb
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 13 14:29:03 2017 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Feb 14 15:32:43 2017 +0100

----------------------------------------------------------------------
 docs/internals/filesystems.md                   | 138 +++++++++++++++++++
 .../apache/flink/core/fs/FSDataInputStream.java |  11 +-
 .../flink/core/fs/FSDataOutputStream.java       |  81 ++++++++++-
 .../org/apache/flink/core/fs/FileSystem.java    |  98 ++++++++++++-
 4 files changed, 323 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/docs/internals/filesystems.md
----------------------------------------------------------------------
diff --git a/docs/internals/filesystems.md b/docs/internals/filesystems.md
new file mode 100644
index 0000000..427251a
--- /dev/null
+++ b/docs/internals/filesystems.md
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the `org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal guarantees across various
types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order to support a wide
+range of file systems. For example, appending to or mutating existing files is not supported.
+
+File systems are identified by a *file system scheme*, such as `file://`, `hdfs://`, etc.
+
+# Implementations
+
+Flink implements the file systems directly, with the following file system schemes:
+
+  - `file`, which represents the machine's local file system.
+
+Other file system types are accessed by an implementation that bridges to the suite of file
systems supported by
+[Apache Hadoop](https://hadoop.apache.org/). The following is an incomplete list of examples:
+
+  - `hdfs`: Hadoop Distributed File System
+  - `s3`, `s3n`, and `s3a`: Amazon S3 file system
+  - `gcs`: Google Cloud Storage
+  - `maprfs`: The MapR distributed file system
+  - ...
+
+Flink loads Hadoop's file systems transparently if it finds the Hadoop File System classes
in the class path and finds a valid
+Hadoop configuration. By default, it looks for the Hadoop configuration in the class path.
Alternatively, one can specify a
+custom location via the configuration entry `fs.hdfs.hadoopconf`.
+
+
+# Persistence Guarantees
+
+These `FileSystem` and its `FsDataOutputStream` instances are used to persistently store
data, both for results of applications
+and for fault tolerance and recovery. It is therefore crucial that the persistence semantics
of these streams are well defined.
+
+## Definition of Persistence Guarantees
+
+Data written to an output stream is considered persistent, if two requirements are met:
+
+  1. **Visibility Requirement:** It must be guaranteed that all other processes, machines,
+     virtual machines, containers, etc. that are able to access the file see the data consistently
+     when given the absolute file path. This requirement is similar to the *close-to-open*
+     semantics defined by POSIX, but restricted to the file itself (by its absolute path).
+
+  2. **Durability Requirement:** The file system's specific durability/persistence requirements
+     must be met. These are specific to the particular file system. For example the
+     {@link LocalFileSystem} does not provide any durability guarantees for crashes of both
+     hardware and operating system, while replicated distributed file systems (like HDFS)
+     guarantee typically durability in the presence of up *n* concurrent node failures,
+     where *n* is the replication factor.
+
+Updates to the file's parent directory (such that the file shows up when
+listing the directory contents) are not required to be complete for the data in the file
stream
+to be considered persistent. This relaxation is important for file systems where updates
to
+directory contents are only eventually consistent.
+
+The `FSDataOutputStream` has to guarantee data persistence for the written bytes once the
call to
+`FSDataOutputStream.close()` returns.
+
+## Examples
+ 
+  - For **fault-tolerant distributed file systems**, data is considered persistent once 
+    it has been received and acknowledged by the file system, typically by having been replicated
+    to a quorum of machines (*durability requirement*). In addition the absolute file path
+    must be visible to all other machines that will potentially access the file (*visibility
requirement*).
+
+    Whether data has hit non-volatile storage on the storage nodes depends on the specific
+    guarantees of the particular file system.
+
+    The metadata updates to the file's parent directory are not required to have reached
+    a consistent state. It is permissible that some machines see the file when listing the
parent
+    directory's contents while others do not, as long as access to the file by its absolute
path
+    is possible on all nodes.
+
+  - A **local file system** must support the POSIX *close-to-open* semantics.
+    Because the local file system does not have any fault tolerance guarantees, no further
+    requirements exist.
+ 
+    The above implies specifically that data may still be in the OS cache when considered
+    persistent from the local file system's perspective. Crashes that cause the OS cache
to loose
+    data are considered fatal to the local machine and are not covered by the local file
system's
+    guarantees as defined by Flink.
+
+    That means that computed results, checkpoints, and savepoints that are written only to
+    the local filesystem are not guaranteed to be recoverable from the local machine's failure,
+    making local file systems unsuitable for production setups.
+
+# Updating File Contents
+
+Many file systems either do not support overwriting contents of existing files at all, or
do not support consistent visibility of the
+updated contents in that case. For that reason, Flink's FileSystem does not support appending
to existing files, or seeking within
+output streams such that previously written data could be changed within the same file.
+
+# Overwriting Files
+
+Overwriting files is in general possible. A file is overwritten by deleting it and creating
a new file.
+However, certain filesystems cannot make that change synchronously visible to all parties
that have access to the file.
+For example [Amazon S3](https://aws.amazon.com/documentation/s3/) guarantees only *eventual
consistency* in
+the visibility of the file replacement: Some machines may see the old file, some machines
may see the new file.
+
+To avoid these consistency issues, the implementations of failure/recovery mechanisms in
Flink strictly avoid writing to
+the same file path more than once.
+
+# Thread Safety
+
+Implementations of `FileSystem` must be thread-safe: The same instance of `FileSystem` is
frequently shared across multiple threads
+in Flink and must be able to concurrently create input/output streams and list file metadata.
+
+The `FSDataOutputStream` and `FSDataOutputStream` implementations are strictly **not thread-safe**.
+Instances of the streams should also not be passed between threads in between read or write
operations, because there are no guarantees
+about the visibility of operations across threads (many operations do not create memory fences).
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
index 6ce1235..44dbcb1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
@@ -25,6 +25,10 @@ import java.io.InputStream;
 
 /**
  * Interface for a data input stream to a file on a {@link FileSystem}.
+ * 
+ * <p>This extends the {@link java.io.InputStream} with methods for accessing
+ * the stream's {@link #getPos() current position} and
+ * {@link #seek(long) seeking} to a desired position.
  */
 @Public
 public abstract class FSDataInputStream extends InputStream {
@@ -35,15 +39,16 @@ public abstract class FSDataInputStream extends InputStream {
 	 * 
 	 * @param desired
 	 *        the desired offset
-	 * @throws IOException
-	 *         thrown if an error occurred while seeking inside the input stream
+	 * @throws IOException Thrown if an error occurred while seeking inside the input stream.
 	 */
 	public abstract void seek(long desired) throws IOException;
 
 	/**
-	 * Get the current position in the input stream.
+	 * Gets the current position in the input stream.
 	 *
 	 * @return current position in the input stream
+	 * @throws IOException Thrown if an I/O error occurred in the underlying stream 
+	 *                     implementation while accessing the stream's position.
 	 */
 	public abstract long getPos() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
index 0318d1f..a8df5c1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
@@ -24,14 +24,93 @@ import java.io.IOException;
 import java.io.OutputStream;
 
 /**
- * Interface for a data output stream to a file on a {@link FileSystem}.
+ * An output stream to a file that is created via a {@link FileSystem}.
+ * This class extends the base {@link java.io.OutputStream} with some additional important
methods.
+ * 
+ * <h2>Data Persistence Guarantees</h2>
+ * 
+ * These streams are used to persistently store data, both for results of streaming applications
+ * and for fault tolerance and recovery. It is therefore crucial that the persistence semantics
+ * of these streams are well defined.
+ * 
+ * <p>Please refer to the class-level docs of {@link FileSystem} for the definition
of data persistence
+ * via Flink's FileSystem abstraction and the {@code FSDataOutputStream}.
+ * 
+ * <h2>Thread Safety</h2>
+ * 
+ * Implementations of the {@code FSDataOutputStream} are generally not assumed to be thread
safe.
+ * Instances of {@code FSDataOutputStream} should not be passed between threads, because
there
+ * are no guarantees about the order of visibility of operations across threads.
+ * 
+ * @see FileSystem
+ * @see FSDataInputStream
  */
 @Public
 public abstract class FSDataOutputStream extends OutputStream {
 
+	/**
+	 * Gets the position of the stream (non-negative), defined as the number of bytes
+	 * from the beginning of the file to the current writing position. The position
+	 * corresponds to the zero-based index of the next byte that will be written.
+	 * 
+	 * <p>This method must report accurately report the current position of the stream.
+	 * Various components of the high-availability and recovery logic rely on the accurate
+	 * 
+	 * @return The current position in the stream, defined as the number of bytes
+	 *         from the beginning of the file to the current writing position.
+	 * 
+	 * @throws IOException Thrown if an I/O error occurs while obtaining the position from
+	 *                     the stream implementation.
+	 */
 	public abstract long getPos() throws IOException;
 
+	/**
+	 * Flushes the stream, writing any data currently buffered in stream implementation
+	 * to the proper output stream. After this method has been called, the stream implementation
+	 * must not hold onto any buffered data any more.
+	 * 
+	 * <p>A completed flush does not mean that the data is necessarily persistent. Data
+	 * persistence can is only assumed after calls to {@link #close()} or {@link #sync()}.
+	 * 
+	 * <p>Implementation note: This overrides the method defined in {@link OutputStream}
+	 * as abstract to force implementations of the {@code FSDataOutputStream} to implement
+	 * this method directly.
+	 * 
+	 * @throws IOException Thrown if an I/O error occurs while flushing the stream.
+	 */
 	public abstract void flush() throws IOException;
 
+	/**
+	 * Flushes the data all the way to the persistent non-volatile storage (for example disks).
+	 * The method behaves similar to the <i>fsync</i> function, forcing all data
to
+	 * be persistent on the devices.
+	 * 
+	 * <p>
+	 * 
+	 * @throws IOException Thrown if an I/O error occurs
+	 */
 	public abstract void sync() throws IOException;
+
+	/**
+	 * Closes the output stream. After this method returns, the implementation must guarantee
+	 * that all data written to the stream is persistent/visible, as defined in the
+	 * {@link FileSystem class-level docs}.
+	 * 
+	 * <p>The above implies that the method must block until persistence can be guaranteed.
+	 * For example for distributed replicated file systems, the method must block until the
+	 * replication quorum has been reached. If the calling thread is interrupted in the
+	 * process, it must fail with an {@code IOException} to indicate that persistence cannot
+	 * be guaranteed.
+	 * 
+	 * <p>If this method throws an exception, the data in the stream cannot be assumed
to be
+	 * persistent.
+	 * 
+	 * <p>Implementation note: This overrides the method defined in {@link OutputStream}
+	 * as abstract to force implementations of the {@code FSDataOutputStream} to implement
+	 * this method directly.
+	 *         
+	 * @throws IOException Thrown, if an error occurred while closing the stream or guaranteeing
+	 *                     that the data is persistent.
+	 */
+	public abstract void close() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7af3b01/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index d8efcbc..c3828fb 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -52,12 +52,108 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Abstract base class of all file systems used by Flink. This class may be extended to implement
  * distributed file systems, or local file systems. The abstraction by this file system is
very simple,
- * and teh set of allowed operations quite limited, to support the common denominator of
a wide
+ * and the set of available operations quite limited, to support the common denominator of
a wide
  * range of file systems. For example, appending to or mutating existing files is not supported.
  * 
  * <p>Flink implements and supports some file system types directly (for example the
default
  * machine-local file system). Other file system types are accessed by an implementation
that bridges
  * to the suite of file systems supported by Hadoop (such as for example HDFS).
+ * 
+ * <h2>Data Persistence</h2>
+ * 
+ * The FileSystem's {@link FSDataOutputStream output streams} are used to persistently store
data,
+ * both for results of streaming applications and for fault tolerance and recovery. It is
therefore
+ * crucial that the persistence semantics of these streams are well defined.
+ * 
+ * <h3>Definition of Persistence Guarantees</h3>
+ * 
+ * Data written to an output stream is considered persistent, if two requirements are met:
+ * 
+ * <ol>
+ *     <li><b>Visibility Requirement:</b> It must be guaranteed that all
other processes, machines,
+ *     virtual machines, containers, etc. that are able to access the file see the data consistently
+ *     when given the absolute file path. This requirement is similar to the <i>close-to-open</i>
+ *     semantics defined by POSIX, but restricted to the file itself (by its absolute path).</li>
+ * 
+ *     <li><b>Durability Requirement:</b> The file system's specific durability/persistence
requirements
+ *     must be met. These are specific to the particular file system. For example the
+ *     {@link LocalFileSystem} does not provide any durability guarantees for crashes of
both
+ *     hardware and operating system, while replicated distributed file systems (like HDFS)
+ *     typically guarantee durability in the presence of at most <i>n</i> concurrent
node failures,
+ *     where <i>n</i> is the replication factor.</li>
+ * </ol>
+ *
+ * <p>Updates to the file's parent directory (such that the file shows up when
+ * listing the directory contents) are not required to be complete for the data in the file
stream
+ * to be considered persistent. This relaxation is important for file systems where updates
to
+ * directory contents are only eventually consistent.
+ * 
+ * <p>The {@link FSDataOutputStream} has to guarantee data persistence for the written
bytes
+ * once the call to {@link FSDataOutputStream#close()} returns.
+ *
+ * <h3>Examples</h3>
+ *
+ * <ul>
+ *     <li>For <b>fault-tolerant distributed file systems</b>, data is
considered persistent once 
+ *     it has been received and acknowledged by the file system, typically by having been
replicated
+ *     to a quorum of machines (<i>durability requirement</i>). In addition the
absolute file path
+ *     must be visible to all other machines that will potentially access the file (<i>visibility
+ *     requirement</i>).
+ *
+ *     <p>Whether data has hit non-volatile storage on the storage nodes depends on
the specific
+ *     guarantees of the particular file system.
+ *
+ *     <p>The metadata updates to the file's parent directory are not required to have
reached
+ *     a consistent state. It is permissible that some machines see the file when listing
the parent
+ *     directory's contents while others do not, as long as access to the file by its absolute
path
+ *     is possible on all nodes.</li>
+ *
+ *     <li>A <b>local file system</b> must support the POSIX <i>close-to-open</i>
semantics.
+ *     Because the local file system does not have any fault tolerance guarantees, no further
+ *     requirements exist.
+ * 
+ *     <p>The above implies specifically that data may still be in the OS cache when
considered
+ *     persistent from the local file system's perspective. Crashes that cause the OS cache
to loose
+ *     data are considered fatal to the local machine and are not covered by the local file
system's
+ *     guarantees as defined by Flink.
+ * 
+ *     <p>That means that computed results, checkpoints, and savepoints that are written
only to
+ *     the local filesystem are not guaranteed to be recoverable from the local machine's
failure,
+ *     making local file systems unsuitable for production setups.</li>
+ * </ul>
+ *
+ * <h2>Updating File Contents</h2>
+ *
+ * Many file systems either do not support overwriting contents of existing files at all,
or do
+ * not support consistent visibility of the updated contents in that case. For that reason,
+ * Flink's FileSystem does not support appending to existing files, or seeking within output
streams
+ * so that previously written data could be overwritten.
+ *
+ * <h2>Overwriting Files</h2>
+ *
+ * Overwriting files is in general possible. A file is overwritten by deleting it and creating
+ * a new file. However, certain filesystems cannot make that change synchronously visible
+ * to all parties that have access to the file.
+ * For example <a href="https://aws.amazon.com/documentation/s3/">Amazon S3</a>
guarantees only
+ * <i>eventual consistency</i> in the visibility of the file replacement: Some
machines may see
+ * the old file, some machines may see the new file.
+ *
+ * <p>To avoid these consistency issues, the implementations of failure/recovery mechanisms
in
+ * Flink strictly avoid writing to the same file path more than once.
+ * 
+ * <h2>Thread Safety</h2>
+ * 
+ * Implementations of {@code FileSystem} must be thread-safe: The same instance of FileSystem
+ * is frequently shared across multiple threads in Flink and must be able to concurrently
+ * create input/output streams and list file metadata.
+ * 
+ * <p>The {@link FSDataOutputStream} and {@link FSDataOutputStream} implementations
are strictly
+ * <b>not thread-safe</b>. Instances of the streams should also not be passed
between threads
+ * in between read or write operations, because there are no guarantees about the visibility
of
+ * operations across threads (many operations do not create memory fences).
+ * 
+ * @see FSDataInputStream
+ * @see FSDataOutputStream
  */
 @Public
 public abstract class FileSystem {


Mime
View raw message