spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject git commit: Honor default fs name when initializing event logger.
Date Wed, 23 Apr 2014 21:47:42 GMT
Repository: spark
Updated Branches:
  refs/heads/master a967b005c -> dd1b7a61d


Honor default fs name when initializing event logger.

This is related to SPARK-1459 / PR #375. Without this fix,
FileLogger.createLogDir() may try to create the log dir on
HDFS, while createWriter() will try to open the log file on
the local file system, leading to interesting errors and
confusion.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #450 from vanzin/event-file-2 and squashes the following commits:

592cdb3 [Marcelo Vanzin] Honor default fs name when initializing event logger.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd1b7a61
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd1b7a61
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd1b7a61

Branch: refs/heads/master
Commit: dd1b7a61d9193c93ab95ab550622259f4bc26f53
Parents: a967b00
Author: Marcelo Vanzin <vanzin@cloudera.com>
Authored: Wed Apr 23 14:47:38 2014 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Wed Apr 23 14:47:38 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 48 ++++++++++----------
 .../spark/scheduler/EventLoggingListener.scala  |  9 +++-
 .../org/apache/spark/util/FileLogger.scala      | 17 ++++---
 3 files changed, 41 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dd1b7a61/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 25ca650..c14dce8 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -216,10 +216,33 @@ class SparkContext(config: SparkConf) extends Logging {
   private[spark] val ui = new SparkUI(this)
   ui.bind()
 
+  /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
*/
+  val hadoopConfiguration: Configuration = {
+    val env = SparkEnv.get
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration()
+    // Explicitly check for S3 environment variables
+    if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
+        System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
+      hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+      hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+      hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+      hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+    }
+    // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
+    conf.getAll.foreach { case (key, value) =>
+      if (key.startsWith("spark.hadoop.")) {
+        hadoopConf.set(key.substring("spark.hadoop.".length), value)
+      }
+    }
+    val bufferSize = conf.get("spark.buffer.size", "65536")
+    hadoopConf.set("io.file.buffer.size", bufferSize)
+    hadoopConf
+  }
+
   // Optionally log Spark events
   private[spark] val eventLogger: Option[EventLoggingListener] = {
     if (conf.getBoolean("spark.eventLog.enabled", false)) {
-      val logger = new EventLoggingListener(appName, conf)
+      val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
       logger.start()
       listenerBus.addListener(logger)
       Some(logger)
@@ -294,29 +317,6 @@ class SparkContext(config: SparkConf) extends Logging {
   postEnvironmentUpdate()
   postApplicationStart()
 
-  /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
*/
-  val hadoopConfiguration: Configuration = {
-    val env = SparkEnv.get
-    val hadoopConf = SparkHadoopUtil.get.newConfiguration()
-    // Explicitly check for S3 environment variables
-    if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
-        System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
-      hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
-      hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
-      hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
-      hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
-    }
-    // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
-    conf.getAll.foreach { case (key, value) =>
-      if (key.startsWith("spark.hadoop.")) {
-        hadoopConf.set(key.substring("spark.hadoop.".length), value)
-      }
-    }
-    val bufferSize = conf.get("spark.buffer.size", "65536")
-    hadoopConf.set("io.file.buffer.size", bufferSize)
-    hadoopConf
-  }
-
   private[spark] var checkpointDir: Option[String] = None
 
   // Thread Local variable that can be used by users to pass information down the stack

http://git-wip-us.apache.org/repos/asf/spark/blob/dd1b7a61/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index b983c16..2fe65cd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler
 
 import scala.collection.mutable
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.json4s.jackson.JsonMethods._
 
@@ -36,7 +37,10 @@ import org.apache.spark.util.{FileLogger, JsonProtocol}
  *   spark.eventLog.dir - Path to the directory in which events are logged.
  *   spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
  */
-private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
+private[spark] class EventLoggingListener(
+    appName: String,
+    conf: SparkConf,
+    hadoopConfiguration: Configuration)
   extends SparkListener with Logging {
 
   import EventLoggingListener._
@@ -49,7 +53,8 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
   val logDir = logBaseDir + "/" + name
 
   private val logger =
-    new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite)
+    new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
+      shouldOverwrite)
 
   /**
    * Begin logging events.

http://git-wip-us.apache.org/repos/asf/spark/blob/dd1b7a61/core/src/main/scala/org/apache/spark/util/FileLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index 7d58d1c..7d47b2a 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -22,7 +22,8 @@ import java.net.URI
 import java.text.SimpleDateFormat
 import java.util.Date
 
-import org.apache.hadoop.fs.{FSDataOutputStream, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.io.CompressionCodec
@@ -37,7 +38,8 @@ import org.apache.spark.io.CompressionCodec
  */
 private[spark] class FileLogger(
     logDir: String,
-    conf: SparkConf = new SparkConf,
+    conf: SparkConf,
+    hadoopConfiguration: Configuration,
     outputBufferSize: Int = 8 * 1024, // 8 KB
     compress: Boolean = false,
     overwrite: Boolean = true)
@@ -85,19 +87,20 @@ private[spark] class FileLogger(
   private def createWriter(fileName: String): PrintWriter = {
     val logPath = logDir + "/" + fileName
     val uri = new URI(logPath)
+    val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme
+    val isDefaultLocal = (defaultFs == null || defaultFs == "file")
 
     /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
      * Therefore, for local files, use FileOutputStream instead. */
-    val dstream = uri.getScheme match {
-      case "file" | null =>
+    val dstream =
+      if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
         // Second parameter is whether to append
         new FileOutputStream(uri.getPath, !overwrite)
-
-      case _ =>
+      } else {
         val path = new Path(logPath)
         hadoopDataStream = Some(fileSystem.create(path, overwrite))
         hadoopDataStream.get
-    }
+      }
 
     val bstream = new BufferedOutputStream(dstream, outputBufferSize)
     val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream


Mime
View raw message