spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject spark git commit: [SPARK-5411] Allow SparkListeners to be specified in SparkConf and loaded when creating SparkContext
Date Thu, 05 Feb 2015 01:18:14 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 dc9ead907 -> 47e4d579e


[SPARK-5411] Allow SparkListeners to be specified in SparkConf and loaded when creating SparkContext

This patch introduces a new configuration option, `spark.extraListeners`, that allows SparkListeners
to be specified in SparkConf and registered before the SparkContext is initialized.  From
the configuration documentation:

> A comma-separated list of classes that implement SparkListener; when initializing SparkContext,
instances of these classes will be created and registered with Spark's listener bus. If a
class has a single-argument constructor that accepts a SparkConf, that constructor will be
called; otherwise, a zero-argument constructor will be called. If no valid constructor can
be found, the SparkContext creation will fail with an exception.

This motivation for this patch is to allow monitoring code to be easily injected into existing
Spark programs without having to modify those programs' code.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #4111 from JoshRosen/SPARK-5190-register-sparklistener-in-sc-constructor and squashes
the following commits:

8370839 [Josh Rosen] Two minor fixes after merging with master
6e0122c [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-5190-register-sparklistener-in-sc-constructor
1a5b9a0 [Josh Rosen] Remove SPARK_EXTRA_LISTENERS environment variable.
2daff9b [Josh Rosen] Add a couple of explanatory comments for SPARK_EXTRA_LISTENERS.
b9973da [Josh Rosen] Add test to ensure that conf and env var settings are merged, not overriden.
d6f3113 [Josh Rosen] Use getConstructors() instead of try-catch to find right constructor.
d0d276d [Josh Rosen] Move code into setupAndStartListenerBus() method
b22b379 [Josh Rosen] Instantiate SparkListeners from classes listed in configurations.
9c0d8f1 [Josh Rosen] Revert "[SPARK-5190] Allow SparkListeners to be registered before SparkContext
starts."
217ecc0 [Josh Rosen] Revert "Add addSparkListener to JavaSparkContext"
25988f3 [Josh Rosen] Add addSparkListener to JavaSparkContext
163ba19 [Josh Rosen] [SPARK-5190] Allow SparkListeners to be registered before SparkContext
starts.

(cherry picked from commit 9a7ce70eabc0ccaa036e142fc97bf0d37faa0b63)
Signed-off-by: Patrick Wendell <patrick@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47e4d579
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47e4d579
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47e4d579

Branch: refs/heads/branch-1.3
Commit: 47e4d579eb4a9aab8e0dd9c1400394d80c8d0388
Parents: dc9ead9
Author: Josh Rosen <joshrosen@databricks.com>
Authored: Wed Feb 4 17:18:03 2015 -0800
Committer: Patrick Wendell <patrick@databricks.com>
Committed: Wed Feb 4 17:18:12 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 57 ++++++++++++++++++--
 .../org/apache/spark/util/ListenerBus.scala     |  3 +-
 .../spark/scheduler/SparkListenerSuite.scala    | 56 +++++++++++++------
 docs/configuration.md                           | 11 ++++
 4 files changed, 106 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/47e4d579/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7f5aef1..a7adddb 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -20,6 +20,7 @@ package org.apache.spark
 import scala.language.implicitConversions
 
 import java.io._
+import java.lang.reflect.Constructor
 import java.net.URI
 import java.util.{Arrays, Properties, UUID}
 import java.util.concurrent.atomic.AtomicInteger
@@ -387,9 +388,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     }
   executorAllocationManager.foreach(_.start())
 
-  // At this point, all relevant SparkListeners have been registered, so begin releasing
events
-  listenerBus.start()
-
   private[spark] val cleaner: Option[ContextCleaner] = {
     if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
       Some(new ContextCleaner(this))
@@ -399,6 +397,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   }
   cleaner.foreach(_.start())
 
+  setupAndStartListenerBus()
   postEnvironmentUpdate()
   postApplicationStart()
 
@@ -1563,6 +1562,58 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   /** Register a new RDD, returning its RDD ID */
   private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
 
+  /**
+   * Registers listeners specified in spark.extraListeners, then starts the listener bus.
+   * This should be called after all internal listeners have been registered with the listener
bus
+   * (e.g. after the web UI and event logging listeners have been registered).
+   */
+  private def setupAndStartListenerBus(): Unit = {
+    // Use reflection to instantiate listeners specified via `spark.extraListeners`
+    try {
+      val listenerClassNames: Seq[String] =
+        conf.get("spark.extraListeners", "").split(',').map(_.trim).filter(_ != "")
+      for (className <- listenerClassNames) {
+        // Use reflection to find the right constructor
+        val constructors = {
+          val listenerClass = Class.forName(className)
+          listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]]
+        }
+        val constructorTakingSparkConf = constructors.find { c =>
+          c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
+        }
+        lazy val zeroArgumentConstructor = constructors.find { c =>
+          c.getParameterTypes.isEmpty
+        }
+        val listener: SparkListener = {
+          if (constructorTakingSparkConf.isDefined) {
+            constructorTakingSparkConf.get.newInstance(conf)
+          } else if (zeroArgumentConstructor.isDefined) {
+            zeroArgumentConstructor.get.newInstance()
+          } else {
+            throw new SparkException(
+              s"$className did not have a zero-argument constructor or a" +
+                " single-argument constructor that accepts SparkConf. Note: if the class
is" +
+                " defined inside of another Scala class, then its constructors may accept
an" +
+                " implicit parameter that references the enclosing class; in this case, you
must" +
+                " define the listener as a top-level class in order to prevent this extra"
+
+                " parameter from breaking Spark's ability to find a valid constructor.")
+          }
+        }
+        listenerBus.addListener(listener)
+        logInfo(s"Registered listener $className")
+      }
+    } catch {
+      case e: Exception =>
+        try {
+          stop()
+        } finally {
+          throw new SparkException(s"Exception when registering SparkListener", e)
+        }
+    }
+
+    listenerBus.start()
+  }
+
   /** Post the application start event */
   private def postApplicationStart() {
     // Note: this code assumes that the task scheduler has been initialized and has contacted

http://git-wip-us.apache.org/repos/asf/spark/blob/47e4d579/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
index bd0aa4d..d60b8b9 100644
--- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -28,7 +28,8 @@ import org.apache.spark.Logging
  */
 private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
 
-  private val listeners = new CopyOnWriteArrayList[L]
+  // Marked `private[spark]` for access in tests.
+  private[spark] val listeners = new CopyOnWriteArrayList[L]
 
   /**
    * Add a listener to listen events. This method is thread-safe and can be called in any
thread.

http://git-wip-us.apache.org/repos/asf/spark/blob/47e4d579/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 0fb1bdd..3a41ee8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -20,26 +20,22 @@ package org.apache.spark.scheduler
 import java.util.concurrent.Semaphore
 
 import scala.collection.mutable
+import scala.collection.JavaConversions._
 
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
-import org.scalatest.Matchers
+import org.scalatest.{FunSuite, Matchers}
 
-import org.apache.spark.{LocalSparkContext, SparkContext}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
 
-class SparkListenerSuite extends FunSuite  with LocalSparkContext with Matchers with BeforeAndAfter
-  with BeforeAndAfterAll with ResetSystemProperties {
+class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
+  with ResetSystemProperties {
 
   /** Length of time to wait while draining listener events. */
   val WAIT_TIMEOUT_MILLIS = 10000
 
   val jobCompletionTime = 1421191296660L
 
-  before {
-    sc = new SparkContext("local", "SparkListenerSuite")
-  }
-
   test("basic creation and shutdown of LiveListenerBus") {
     val counter = new BasicJobCounter
     val bus = new LiveListenerBus
@@ -127,6 +123,7 @@ class SparkListenerSuite extends FunSuite  with LocalSparkContext with
Matchers
   }
 
   test("basic creation of StageInfo") {
+    sc = new SparkContext("local", "SparkListenerSuite")
     val listener = new SaveStageAndTaskInfo
     sc.addSparkListener(listener)
     val rdd1 = sc.parallelize(1 to 100, 4)
@@ -148,6 +145,7 @@ class SparkListenerSuite extends FunSuite  with LocalSparkContext with
Matchers
   }
 
   test("basic creation of StageInfo with shuffle") {
+    sc = new SparkContext("local", "SparkListenerSuite")
     val listener = new SaveStageAndTaskInfo
     sc.addSparkListener(listener)
     val rdd1 = sc.parallelize(1 to 100, 4)
@@ -185,6 +183,7 @@ class SparkListenerSuite extends FunSuite  with LocalSparkContext with
Matchers
   }
 
   test("StageInfo with fewer tasks than partitions") {
+    sc = new SparkContext("local", "SparkListenerSuite")
     val listener = new SaveStageAndTaskInfo
     sc.addSparkListener(listener)
     val rdd1 = sc.parallelize(1 to 100, 4)
@@ -201,6 +200,7 @@ class SparkListenerSuite extends FunSuite  with LocalSparkContext with
Matchers
   }
 
   test("local metrics") {
+    sc = new SparkContext("local", "SparkListenerSuite")
     val listener = new SaveStageAndTaskInfo
     sc.addSparkListener(listener)
     sc.addSparkListener(new StatsReportListener)
@@ -267,6 +267,7 @@ class SparkListenerSuite extends FunSuite  with LocalSparkContext with
Matchers
   }
 
   test("onTaskGettingResult() called when result fetched remotely") {
+    sc = new SparkContext("local", "SparkListenerSuite")
     val listener = new SaveTaskEvents
     sc.addSparkListener(listener)
 
@@ -287,6 +288,7 @@ class SparkListenerSuite extends FunSuite  with LocalSparkContext with
Matchers
   }
 
   test("onTaskGettingResult() not called when result sent directly") {
+    sc = new SparkContext("local", "SparkListenerSuite")
     val listener = new SaveTaskEvents
     sc.addSparkListener(listener)
 
@@ -302,6 +304,7 @@ class SparkListenerSuite extends FunSuite  with LocalSparkContext with
Matchers
   }
 
   test("onTaskEnd() should be called for all started tasks, even after job has been killed")
{
+    sc = new SparkContext("local", "SparkListenerSuite")
     val WAIT_TIMEOUT_MILLIS = 10000
     val listener = new SaveTaskEvents
     sc.addSparkListener(listener)
@@ -356,6 +359,17 @@ class SparkListenerSuite extends FunSuite  with LocalSparkContext with
Matchers
     assert(jobCounter2.count === 5)
   }
 
+  test("registering listeners via spark.extraListeners") {
+    val conf = new SparkConf().setMaster("local").setAppName("test")
+      .set("spark.extraListeners", classOf[ListenerThatAcceptsSparkConf].getName + "," +
+        classOf[BasicJobCounter].getName)
+    sc = new SparkContext(conf)
+    sc.listenerBus.listeners.collect { case x: BasicJobCounter => x}.size should be (1)
+    sc.listenerBus.listeners.collect {
+      case x: ListenerThatAcceptsSparkConf => x
+    }.size should be (1)
+  }
+
   /**
    * Assert that the given list of numbers has an average that is greater than zero.
    */
@@ -364,14 +378,6 @@ class SparkListenerSuite extends FunSuite  with LocalSparkContext with
Matchers
   }
 
   /**
-   * A simple listener that counts the number of jobs observed.
-   */
-  private class BasicJobCounter extends SparkListener {
-    var count = 0
-    override def onJobEnd(job: SparkListenerJobEnd) = count += 1
-  }
-
-  /**
    * A simple listener that saves all task infos and task metrics.
    */
   private class SaveStageAndTaskInfo extends SparkListener {
@@ -423,3 +429,19 @@ class SparkListenerSuite extends FunSuite  with LocalSparkContext with
Matchers
   }
 
 }
+
+// These classes can't be declared inside of the SparkListenerSuite class because we don't
want
+// their constructors to contain references to SparkListenerSuite:
+
+/**
+ * A simple listener that counts the number of jobs observed.
+ */
+private class BasicJobCounter extends SparkListener {
+  var count = 0
+  override def onJobEnd(job: SparkListenerJobEnd) = count += 1
+}
+
+private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListener {
+  var count = 0
+  override def onJobEnd(job: SparkListenerJobEnd) = count += 1
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/47e4d579/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 62d3fca..8b1d759 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -190,6 +190,17 @@ of the most common options to set are:
     Logs the effective SparkConf as INFO when a SparkContext is started.
   </td>
 </tr>
+<tr>
+  <td><code>spark.extraListeners</code></td>
+  <td>(none)</td>
+  <td>
+    A comma-separated list of classes that implement <code>SparkListener</code>;
when initializing
+    SparkContext, instances of these classes will be created and registered with Spark's
listener
+    bus.  If a class has a single-argument constructor that accepts a SparkConf, that constructor
+    will be called; otherwise, a zero-argument constructor will be called. If no valid constructor
+    can be found, the SparkContext creation will fail with an exception.
+  </td>
+</tr>
 </table>
 
 Apart from these, the following properties are also available, and may be useful in some
situations:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message