spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxwing <...@git.apache.org>
Subject [GitHub] spark pull request #16947: [SPARK-19617][SS]Fix the race condition when star...
Date Fri, 17 Feb 2017 22:48:01 GMT
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16947#discussion_r101863086
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
---
    @@ -109,39 +108,12 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession,
path:
       override def add(batchId: Long, metadata: T): Boolean = {
         get(batchId).map(_ => false).getOrElse {
           // Only write metadata when the batch has not yet been written
    -      if (fileManager.isLocalFileSystem) {
    -        Thread.currentThread match {
    -          case ut: UninterruptibleThread =>
    -            // When using a local file system, "writeBatch" must be called on a
    -            // [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can
be disabled
    -            // while writing the batch file.
    -            //
    -            // This is because Hadoop "Shell.runCommand" swallows InterruptException
(HADOOP-14084).
    -            // If the user tries to stop a query, and the thread running "Shell.runCommand"
is
    -            // interrupted, then InterruptException will be dropped and the query will
be still
    -            // running. (Note: `writeBatch` creates a file using HDFS APIs and will call
    -            // "Shell.runCommand" to set the file permission if using the local file
system)
    -            //
    -            // Hence, we make sure that "writeBatch" is called on [[UninterruptibleThread]]
which
    -            // allows us to disable interrupts here, in order to propagate the interrupt
state
    -            // correctly. Also see SPARK-19599.
    -            ut.runUninterruptibly { writeBatch(batchId, metadata) }
    -          case _ =>
    -            throw new IllegalStateException(
    -              "HDFSMetadataLog.add() on a local file system must be executed on " +
    -                "a o.a.spark.util.UninterruptibleThread")
    -        }
    -      } else {
    -        // For a distributed file system, such as HDFS or S3, if the network is broken,
write
    -        // operations may just hang until timeout. We should enable interrupts to allow
stopping
    -        // the query fast.
    -        writeBatch(batchId, metadata)
    -      }
    +      writeBatch(batchId, metadata)
    --- End diff --
    
    > Are we removing this now because that has been fixed in hadoop?
    
    Yes. We dropped the support to Hadoop 2.5 and earlier versions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message