spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mridul...@apache.org
Subject spark git commit: [SPARK-21549][CORE] Respect OutputFormats with no/invalid output directory provided
Date Mon, 16 Oct 2017 01:41:02 GMT
Repository: spark
Updated Branches:
  refs/heads/master e8547ffb4 -> 13c155958


[SPARK-21549][CORE] Respect OutputFormats with no/invalid output directory provided

## What changes were proposed in this pull request?

PR #19294 added support for null's - but spark 2.1 handled other error cases where path argument
can be invalid.
Namely:

* empty string
* URI parse exception while creating Path

This is resubmission of PR #19487, which I messed up while updating my repo.

## How was this patch tested?

Enhanced test to cover new support added.

Author: Mridul Muralidharan <mridul@gmail.com>

Closes #19497 from mridulm/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13c15595
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13c15595
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13c15595

Branch: refs/heads/master
Commit: 13c1559587d0eb533c94f5a492390f81b048b347
Parents: e8547ff
Author: Mridul Muralidharan <mridul@gmail.com>
Authored: Sun Oct 15 18:40:53 2017 -0700
Committer: Mridul Muralidharan <mridul@gmail.com>
Committed: Sun Oct 15 18:40:53 2017 -0700

----------------------------------------------------------------------
 .../io/HadoopMapReduceCommitProtocol.scala      | 24 ++++++++-------
 .../spark/rdd/PairRDDFunctionsSuite.scala       | 31 ++++++++++++++------
 2 files changed, 35 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/13c15595/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index a7e6859..95c99d2 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -20,6 +20,7 @@ package org.apache.spark.internal.io
 import java.util.{Date, UUID}
 
 import scala.collection.mutable
+import scala.util.Try
 
 import org.apache.hadoop.conf.Configurable
 import org.apache.hadoop.fs.Path
@@ -48,6 +49,16 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
   @transient private var committer: OutputCommitter = _
 
   /**
+   * Checks whether there are files to be committed to a valid output location.
+   *
+   * As committing and aborting a job occurs on driver, where `addedAbsPathFiles` is always
null,
+   * it is necessary to check whether a valid output path is specified.
+   * [[HadoopMapReduceCommitProtocol#path]] need not be a valid [[org.apache.hadoop.fs.Path]]
for
+   * committers not writing to distributed file systems.
+   */
+  private val hasValidPath = Try { new Path(path) }.isSuccess
+
+  /**
    * Tracks files staged by this task for absolute output paths. These outputs are not managed
by
    * the Hadoop OutputCommitter, so we must move these to their final locations on job commit.
    *
@@ -60,15 +71,6 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
    */
   private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId)
 
-  /**
-   * Checks whether there are files to be committed to an absolute output location.
-   *
-   * As committing and aborting a job occurs on driver, where `addedAbsPathFiles` is always
null,
-   * it is necessary to check whether the output path is specified. Output path may not be
required
-   * for committers not writing to distributed file systems.
-   */
-  private def hasAbsPathFiles: Boolean = path != null
-
   protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
     val format = context.getOutputFormatClass.newInstance()
     // If OutputFormat is Configurable, we should set conf to it.
@@ -142,7 +144,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
     val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]])
       .foldLeft(Map[String, String]())(_ ++ _)
     logDebug(s"Committing files staged for absolute locations $filesToMove")
-    if (hasAbsPathFiles) {
+    if (hasValidPath) {
       val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
       for ((src, dst) <- filesToMove) {
         fs.rename(new Path(src), new Path(dst))
@@ -153,7 +155,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
 
   override def abortJob(jobContext: JobContext): Unit = {
     committer.abortJob(jobContext, JobStatus.State.FAILED)
-    if (hasAbsPathFiles) {
+    if (hasValidPath) {
       val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
       fs.delete(absPathStagingDir, true)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/13c15595/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 07579c5..0a248b6 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -568,21 +568,34 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext
{
     assert(FakeWriterWithCallback.exception.getMessage contains "failed to write")
   }
 
-  test("saveAsNewAPIHadoopDataset should respect empty output directory when " +
+  test("saveAsNewAPIHadoopDataset should support invalid output paths when " +
     "there are no files to be committed to an absolute output location") {
     val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1)
 
-    val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration))
-    job.setOutputKeyClass(classOf[Integer])
-    job.setOutputValueClass(classOf[Integer])
-    job.setOutputFormatClass(classOf[NewFakeFormat])
-    val jobConfiguration = job.getConfiguration
+    def saveRddWithPath(path: String): Unit = {
+      val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration))
+      job.setOutputKeyClass(classOf[Integer])
+      job.setOutputValueClass(classOf[Integer])
+      job.setOutputFormatClass(classOf[NewFakeFormat])
+      if (null != path) {
+        job.getConfiguration.set("mapred.output.dir", path)
+      } else {
+        job.getConfiguration.unset("mapred.output.dir")
+      }
+      val jobConfiguration = job.getConfiguration
+
+      // just test that the job does not fail with java.lang.IllegalArgumentException.
+      pairs.saveAsNewAPIHadoopDataset(jobConfiguration)
+    }
 
-    // just test that the job does not fail with
-    // java.lang.IllegalArgumentException: Can not create a Path from a null string
-    pairs.saveAsNewAPIHadoopDataset(jobConfiguration)
+    saveRddWithPath(null)
+    saveRddWithPath("")
+    saveRddWithPath("::invalid::")
   }
 
+  // In spark 2.1, only null was supported - not other invalid paths.
+  // org.apache.hadoop.mapred.FileOutputFormat.getOutputPath fails with IllegalArgumentException
+  // for non-null invalid paths.
   test("saveAsHadoopDataset should respect empty output directory when " +
     "there are no files to be committed to an absolute output location") {
     val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message