spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdas <...@git.apache.org>
Subject [GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...
Date Thu, 12 Apr 2018 17:03:56 GMT
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21048#discussion_r181153863
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
---
    @@ -0,0 +1,344 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.io.{FileSystem => _, _}
    +import java.util.{EnumSet, UUID}
    +
    +import scala.util.control.NonFatal
    +
    +import org.apache.commons.io.IOUtils
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs._
    +import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
    +import org.apache.hadoop.fs.permission.FsPermission
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.util.Utils
    +
    +/**
    + * An interface to abstract out all operation related to streaming checkpoints. Most
importantly,
    + * the key operation this interface provides is `createAtomic(path, overwrite)` which
returns a
    + * `CancellableFSDataOutputStream`. This method is used by [[HDFSMetadataLog]] and
    + * [[org.apache.spark.sql.execution.streaming.state.StateStore StateStore]] implementations
    + * to write a complete checkpoint file atomically (i.e. no partial file will be visible),
with or
    + * without overwrite.
    + *
    + * This higher-level interface above the Hadoop FileSystem is necessary because
    + * different implementation of FileSystem/FileContext may have different combination
of operations
    + * to provide the desired atomic guarantees (e.g. write-to-temp-file-and-rename,
    + * direct-write-and-cancel-on-failure) and this abstraction allow different implementations
while
    + * keeping the usage simple (`createAtomic` -> `close` or `cancel`).
    + */
    +trait CheckpointFileManager {
    +
    +  import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
    +
    +  /**
    +   * Create a file and make its contents available atomically after the output stream
is closed.
    +   *
    +   * @param path                Path to create
    +   * @param overwriteIfPossible If true, then the implementations must do a best-effort
attempt to
    +   *                            overwrite the file if it already exists. It should not
throw
    +   *                            any exception if the file exists. However, if false,
then the
    +   *                            implementation must not overwrite if the file alraedy
exists and
    +   *                            must throw `FileAlreadyExistsException` in that case.
    +   */
    +  def createAtomic(path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream
    +
    +  /** Open a file for reading, or throw exception if it does not exist. */
    +  def open(path: Path): FSDataInputStream
    +
    +  /** List the files in a path that match a filter. */
    +  def list(path: Path, filter: PathFilter): Array[FileStatus]
    +
    +  /** List all the files in a path. */
    +  def list(path: Path): Array[FileStatus] = {
    +    list(path, new PathFilter { override def accept(path: Path): Boolean = true })
    +  }
    +
    +  /** Make directory at the give path and all its parent directories as needed. */
    +  def mkdirs(path: Path): Unit
    +
    +  /** Whether path exists */
    +  def exists(path: Path): Boolean
    +
    +  /** Recursively delete a path if it exists. Should not throw exception if file doesn't
exist. */
    +  def delete(path: Path): Unit
    +
    +  /** Is the default file system this implementation is operating on the local file system.
*/
    +  def isLocal: Boolean
    +}
    +
    +object CheckpointFileManager extends Logging {
    +
    +  /**
    +   * Additional methods in CheckpointFileManager implementations that allows
    +   * [[RenameBasedFSDataOutputStream]] get atomicity by write-to-temp-file-and-rename
    +   */
    +  sealed trait RenameHelperMethods { self => CheckpointFileManager
    +    /** Create a file with overwrite. */
    +    def create(path: Path): FSDataOutputStream
    +
    +    /**
    +     * Rename a file.
    +     *
    +     * @param srcPath             Source path to rename
    +     * @param dstPath             Destination path to rename to
    +     * @param overwriteIfPossible If true, then the implementations must do a best-effort
attempt to
    +     *                            overwrite the file if it already exists. It should
not throw
    +     *                            any exception if the file exists. However, if false,
then the
    +     *                            implementation must not overwrite if the file alraedy
exists and
    +     *                            must throw `FileAlreadyExistsException` in that case.
    +     */
    +    def rename(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit
    +  }
    +
    +  /**
    +   * An interface to add the cancel() operation to [[FSDataOutputStream]]. This is used
    +   * mainly by `CheckpointFileManager.createAtomic` to write a file atomically.
    +   *
    +   * @see [[CheckpointFileManager]].
    +   */
    +  abstract class CancellableFSDataOutputStream(protected val underlyingStream: OutputStream)
    +    extends FSDataOutputStream(underlyingStream, null) {
    +    /** Cancel the `underlyingStream` and ensure that the output file is not generated.
*/
    +    def cancel(): Unit
    +  }
    +
    +  /**
    +   * An implementation of [[CancellableFSDataOutputStream]] that writes a file atomically
by writing
    +   * to a temporary file and then renames.
    +   */
    +  sealed class RenameBasedFSDataOutputStream(
    +      fm: CheckpointFileManager with RenameHelperMethods,
    +      finalPath: Path,
    +      tempPath: Path,
    +      overwriteIfPossible: Boolean)
    +    extends CancellableFSDataOutputStream(fm.create(tempPath)) {
    +
    +    def this(fm: CheckpointFileManager with RenameHelperMethods, path: Path, overwrite:
Boolean) = {
    +      this(fm, path, generateTempPath(path), overwrite)
    +    }
    +
    +    logInfo(s"Writing atomically to $finalPath using temp file $tempPath")
    --- End diff --
    
    I was thinking of having it as logInfo, so that we can debug stuff if the renaming goes
wrong in some way. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message