spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdas <>
Subject [GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...
Date Wed, 11 Apr 2018 23:00:33 GMT
GitHub user tdas opened a pull request:

    [SPARK-23966][SS] Refactoring all checkpoint file writing logic in a common CheckpointFileManager

    ## What changes were proposed in this pull request?
    Checkpoint files (offset log files, state store files) in Structured Streaming must be
written atomically such that no partial files are generated (would break fault-tolerance guarantees).
Currently, there are 3 locations which try to do this individually, and in some cases, incorrectly.
    1. HDFSOffsetMetadataLog - This uses a FileManager interface to use any implementation
of `FileSystem` or `FileContext` APIs. It preferably loads `FileContext` implementation as
FileContext of HDFS has atomic renames.
    1. HDFSBackedStateStore (aka in-memory state store)
      - Writing a file - This uses FileSystem APIs only to perform a rename.
This is incorrect as rename is not atomic in HDFS FileSystem implementation.
      - Writing a snapshot file - Same as above.
    #### Current problems:
    1. State Store behavior is incorrect - 
    1. Inflexible - Some file systems provide mechanisms other than write-to-temp-file-and-rename
for writing atomically and more efficiently. For example, with S3 you can write directly to
the final file and it will be made visible only when the entire file is written and closed
correctly. Any failure can be made to terminate the writing without making any partial files
visible in S3. The current code does not abstract out this mechanism enough that it can be
    #### Solution:
    1. Introduce a common interface that all 3 cases above can use to write checkpoint files
    2. This interface must provide the necessary interfaces that allow customization of the
write-and-rename mechanism.
    This PR does that by introducing the interface `CheckpointFileManager` and modifying `HDFSMetadataLog`
and `HDFSBackedStateStore` to use the interface. Similar to earlier `FileManager`, there are
implementations based on `FileSystem` and `FileContext` APIs, and the latter implementation
is preferred to make it work correctly with HDFS.
    The key method this interface has is `createAtomic(path, overwrite)` which returns a `CancellableFSDataOutputStream`
that has the method `cancel()`. All users of this method need to either call `close()` to
successfully write the file, or `cancel()` in case of an error.
    ## How was this patch tested?
    New tests in `CheckpointFileManagerSuite` and slightly modified existing tests.

You can merge this pull request into a Git repository by running:

    $ git pull SPARK-23966

Alternatively you can review and apply these changes as the patch at:

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21048
commit df7b339d73097b8501fe0937f770b8b2ded1b63e
Author: Tathagata Das <tathagata.das1565@...>
Date:   2018-04-11T04:21:14Z




To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message