spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [3/9] git commit: Removing initLogging entirely
Date Thu, 02 Jan 2014 01:04:03 GMT
Removing initLogging entirely


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/18181e6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/18181e6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/18181e6c

Branch: refs/heads/master
Commit: 18181e6c4120b04d125aa99a1ac63e5e7b2c0e3d
Parents: 1cbef08
Author: Patrick Wendell <pwendell@gmail.com>
Authored: Mon Dec 30 23:37:41 2013 -0800
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Mon Dec 30 23:39:47 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/HttpServer.scala     |  1 +
 .../main/scala/org/apache/spark/Logging.scala   | 23 +++++++++++++++++---
 .../scala/org/apache/spark/SparkContext.scala   |  3 ---
 .../org/apache/spark/executor/Executor.scala    |  2 --
 .../apache/spark/metrics/MetricsConfig.scala    |  1 -
 .../apache/spark/metrics/MetricsSystem.scala    |  1 -
 .../spark/storage/BlockManagerMasterActor.scala |  2 --
 .../spark/storage/BlockManagerWorker.scala      |  3 ---
 .../spark/storage/BlockMessageArray.scala       |  2 --
 .../org/apache/spark/repl/SparkILoop.scala      |  2 --
 .../org/apache/spark/streaming/DStream.scala    |  2 --
 .../apache/spark/streaming/DStreamGraph.scala   |  1 -
 .../spark/streaming/StreamingContext.scala      |  2 --
 .../streaming/dstream/NetworkInputDStream.scala |  2 --
 .../streaming/scheduler/JobGenerator.scala      |  1 -
 .../streaming/scheduler/JobScheduler.scala      |  2 --
 .../streaming/util/MasterFailureTest.scala      |  3 ---
 17 files changed, 21 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/core/src/main/scala/org/apache/spark/HttpServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index cdfc9dd..240f32e 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -46,6 +46,7 @@ private[spark] class HttpServer(resourceBase: File) extends Logging {
     if (server != null) {
       throw new ServerStateException("Server is already started")
     } else {
+      log.info("Starting HTTP Server")
       server = new Server()
       val connector = new SocketConnector
       connector.setMaxIdleTime(60*1000)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/core/src/main/scala/org/apache/spark/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index b97697d..1fdbccd 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -33,6 +33,7 @@ trait Logging {
   // Method to get or create the logger for this object
   protected def log: Logger = {
     if (log_ == null) {
+      initializeIfNecessary()
       var className = this.getClass.getName
       // Ignore trailing $'s in the class names for Scala objects
       if (className.endsWith("$")) {
@@ -89,9 +90,15 @@ trait Logging {
     log.isTraceEnabled
   }
 
-  // Method for ensuring that logging is initialized, to avoid having multiple
-  // threads do it concurrently (as SLF4J initialization is not thread safe).
-  protected def initLogging() {
+  private def initializeIfNecessary() {
+    Logging.initLock.synchronized {
+      if (!Logging.initialized) {
+        initializeLogging()
+      }
+    }
+  }
+
+  private def initializeLogging() {
     // If Log4j doesn't seem initialized, load a default properties file
     val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
     if (!log4jInitialized) {
@@ -101,7 +108,17 @@ trait Logging {
         case Some(url) => PropertyConfigurator.configure(url)
         case None => System.err.println(s"Spark was unable to load $defaultLogProps")
       }
+      log.info(s"Using Spark's default log4j profile: $defaultLogProps")
     }
+    Logging.initialized = true
+
+    // Force a call into slf4j to initialize it avoids this happening from mutliple threads
+    // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
     log
   }
 }
+
+object Logging {
+  @transient @volatile private var initialized = false
+  @transient val initLock = new Object()
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ad3337d..70fd499 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -88,9 +88,6 @@ class SparkContext(
       scala.collection.immutable.Map())
   extends Logging {
 
-  // Ensure logging is initialized before we spawn any threads
-  initLogging()
-
   // Set Spark driver host and port system properties
   if (System.getProperty("spark.driver.host") == null) {
     System.setProperty("spark.driver.host", Utils.localHostName())

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0f19d7a..782be9a 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -48,8 +48,6 @@ private[spark] class Executor(
 
   private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
 
-  initLogging()
-
   // No ip or host:port - just hostname
   Utils.checkHost(slaveHostname, "Expected executed slave to be a hostname")
   // must not have port specified.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index caab748..6f9f299 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -26,7 +26,6 @@ import scala.util.matching.Regex
 import org.apache.spark.Logging
 
 private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
-  initLogging()
 
   val DEFAULT_PREFIX = "*"
   val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index bec0c83..8e038ce 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -63,7 +63,6 @@ import org.apache.spark.metrics.source.Source
  * [options] is the specific property of this source or sink.
  */
 private[spark] class MetricsSystem private (val instance: String) extends Logging {
-  initLogging()
 
   val confFile = System.getProperty("spark.metrics.conf")
   val metricsConfig = new MetricsConfig(Option(confFile))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 21022e1..e0eb02c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -50,8 +50,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging
{
 
   private val akkaTimeout = AkkaUtils.askTimeout
 
-  initLogging()
-
   val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
     "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
index 0c66add..21f0036 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
@@ -30,7 +30,6 @@ import org.apache.spark.util.Utils
  * TODO: Use event model.
  */
 private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
-  initLogging()
 
   blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)
 
@@ -101,8 +100,6 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager)
extends
 private[spark] object BlockManagerWorker extends Logging {
   private var blockManagerWorker: BlockManagerWorker = null
 
-  initLogging()
-
   def startBlockManagerWorker(manager: BlockManager) {
     blockManagerWorker = new BlockManagerWorker(manager)
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
index 6ce9127..a06f50a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
@@ -37,8 +37,6 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM
 
   def length = blockMessages.length 
 
-  initLogging()
-  
   def set(bufferMessage: BufferMessage) {
     val startTime = System.currentTimeMillis
     val newBlockMessages = new ArrayBuffer[BlockMessage]()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index b2a1815..523fd12 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -60,8 +60,6 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
   def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out, None)
   def this() = this(None, new JPrintWriter(Console.out, true), None)
 
-  initLogging()
-
   var in: InteractiveReader = _   // the input stream from which commands come
   var settings: Settings = _
   var intp: SparkIMain = _

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index a78d396..8ebe09d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -56,8 +56,6 @@ abstract class DStream[T: ClassTag] (
     @transient protected[streaming] var ssc: StreamingContext
   ) extends Serializable with Logging {
 
-  initLogging()
-
   // =======================================================================
   // Methods that should be implemented by subclasses of DStream
   // =======================================================================

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index daed7ff..a09b891 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -24,7 +24,6 @@ import org.apache.spark.Logging
 import org.apache.spark.streaming.scheduler.Job
 
 final private[streaming] class DStreamGraph extends Serializable with Logging {
-  initLogging()
 
   private val inputStreams = new ArrayBuffer[InputDStream[_]]()
   private val outputStreams = new ArrayBuffer[DStream[_]]()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 41da028..c759b36 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -95,8 +95,6 @@ class StreamingContext private (
    */
   def this(path: String) = this(null, CheckpointReader.read(path), null)
 
-  initLogging()
-
   if (sc_ == null && cp_ == null) {
     throw new Exception("Spark Streaming cannot be initialized with " +
       "both SparkContext and checkpoint as null")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 5add208..8c7f423 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -88,8 +88,6 @@ private[streaming] case class ReportError(msg: String) extends NetworkReceiverMe
  */
 abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging {
 
-  initLogging()
-
   lazy protected val env = SparkEnv.get
 
   lazy protected val actor = env.actorSystem.actorOf(

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 1cd0b9b..afe9316 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -29,7 +29,6 @@ import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
 private[streaming]
 class JobGenerator(jobScheduler: JobScheduler) extends Logging {
 
-  initLogging()
   val ssc = jobScheduler.ssc
   val clockClass = System.getProperty(
     "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 9511ccf..488cc2f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -30,8 +30,6 @@ import org.apache.spark.streaming._
 private[streaming]
 class JobScheduler(val ssc: StreamingContext) extends Logging {
 
-  initLogging()
-
   val jobSets = new ConcurrentHashMap[Time, JobSet]
   val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
   val executor = Executors.newFixedThreadPool(numConcurrentJobs)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18181e6c/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 4a3993e..1559f7a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration
 
 private[streaming]
 object MasterFailureTest extends Logging {
-  initLogging()
 
   @volatile var killed = false
   @volatile var killCount = 0
@@ -331,7 +330,6 @@ class TestOutputStream[T: ClassTag](
  */
 private[streaming]
 class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging
{
-  initLogging()
 
   override def run() {
     try {
@@ -366,7 +364,6 @@ class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends
Thread
 private[streaming]
 class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
   extends Thread with Logging {
-  initLogging()
 
   override def run() {
     val localTestDir = Files.createTempDir()


Mime
View raw message