spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-11361][STREAMING] Show scopes of RDD operations inside DStream.foreachRDD and DStream.transform in DAG viz
Date Wed, 11 Nov 2015 00:54:20 GMT
Repository: spark
Updated Branches:
  refs/heads/master 900917541 -> 6600786dd


[SPARK-11361][STREAMING] Show scopes of RDD operations inside DStream.foreachRDD and DStream.transform
in DAG viz

Currently, when a DStream sets the scope for RDD generated by it, that scope is not allowed
to be overridden by the RDD operations. So in case of `DStream.foreachRDD`, all the RDDs generated
inside the foreachRDD get the same scope - `foreachRDD  <time>`, as set by the `ForeachDStream`.
So it is hard to debug generated RDDs in the RDD DAG viz in the Spark UI.

This patch allows the RDD operations inside `DStream.transform` and `DStream.foreachRDD` to
append their own scopes to the earlier DStream scope.

I have also slightly tweaked how callsites are set such that the short callsite reflects the
RDD operation name and line number. This tweak is necessary as callsites are not managed through
scopes (which support nesting and overriding) and I didnt want to add another local property
to control nesting and overriding of callsites.

## Before:
![image](https://cloud.githubusercontent.com/assets/663212/10808548/fa71c0c4-7da9-11e5-9af0-5737793a146f.png)

## After:
![image](https://cloud.githubusercontent.com/assets/663212/10808659/37bc45b6-7dab-11e5-8041-c20be6a9bc26.png)

The code that was used to generate this is:
```
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.foreachRDD { rdd =>
      val temp = rdd.map { _ -> 1 }.reduceByKey( _ + _)
      val temp2 = temp.map { _ -> 1}.reduceByKey(_ + _)
      val count = temp2.count
      println(count)
    }
```

Note
- The inner scopes of the RDD operations map/reduceByKey inside foreachRDD is visible
- The short callsites of stages refers to the line number of the RDD ops rather than the same
line number of foreachRDD in all three cases.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #9315 from tdas/SPARK-11361.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6600786d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6600786d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6600786d

Branch: refs/heads/master
Commit: 6600786dddc89cb16779ee56b9173f63a3af3f27
Parents: 9009175
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Tue Nov 10 16:54:06 2015 -0800
Committer: Andrew Or <andrew@databricks.com>
Committed: Tue Nov 10 16:54:06 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  9 +--
 .../spark/streaming/TestOutputStream.scala      |  2 +-
 .../spark/streaming/dstream/DStream.scala       | 63 +++++++++++++---
 .../streaming/dstream/ForEachDStream.scala      | 14 +++-
 .../streaming/dstream/TransformedDStream.scala  | 13 ++++
 .../spark/streaming/DStreamScopeSuite.scala     | 75 ++++++++++++++++----
 .../apache/spark/streaming/TestSuiteBase.scala  |  4 +-
 7 files changed, 147 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7421821..67270c3 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1787,10 +1787,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * has overridden the call site using `setCallSite()`, this will return the user's version.
    */
   private[spark] def getCallSite(): CallSite = {
-    Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite =>
-      val longCallSite = Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("")
-      CallSite(shortCallSite, longCallSite)
-    }.getOrElse(Utils.getCallSite())
+    val callSite = Utils.getCallSite()
+    CallSite(
+      Option(getLocalProperty(CallSite.SHORT_FORM)).getOrElse(callSite.shortForm),
+      Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse(callSite.longForm)
+    )
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
index 1a90000..79077e4 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
@@ -37,7 +37,7 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
   extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.collect()
     output += collected
-  }) {
+  }, false) {
 
   // This is to clear the output buffer every it is read from a checkpoint
   @throws(classOf[IOException])

http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 1da0b0a..1a6edf9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -341,7 +341,7 @@ abstract class DStream[T: ClassTag] (
       // of RDD generation, else generate nothing.
       if (isTimeValid(time)) {
 
-        val rddOption = createRDDWithLocalProperties(time) {
+        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
           // Disable checks for existing output directories in jobs launched by the streaming
           // scheduler, since we may need to write output to an existing directory during
checkpoint
           // recovery; see SPARK-4835 for more details. We need to have this call here because
@@ -373,27 +373,52 @@ abstract class DStream[T: ClassTag] (
   /**
    * Wrap a body of code such that the call site and operation scope
    * information are passed to the RDDs created in this body properly.
-   */
-  protected def createRDDWithLocalProperties[U](time: Time)(body: => U): U = {
+   * @param body RDD creation code to execute with certain local properties.
+   * @param time Current batch time that should be embedded in the scope names
+   * @param displayInnerRDDOps Whether the detailed callsites and scopes of the inner RDDs
generated
+   *                           by `body` will be displayed in the UI; only the scope and
callsite
+   *                           of the DStream operation that generated `this` will be displayed.
+   */
+  protected[streaming] def createRDDWithLocalProperties[U](
+      time: Time,
+      displayInnerRDDOps: Boolean)(body: => U): U = {
     val scopeKey = SparkContext.RDD_SCOPE_KEY
     val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
     // Pass this DStream's operation scope and creation site information to RDDs through
     // thread-local properties in our SparkContext. Since this method may be called from
another
     // DStream, we need to temporarily store any old scope and creation site information
to
     // restore them later after setting our own.
-    val prevCallSite = ssc.sparkContext.getCallSite()
+    val prevCallSite = CallSite(
+      ssc.sparkContext.getLocalProperty(CallSite.SHORT_FORM),
+      ssc.sparkContext.getLocalProperty(CallSite.LONG_FORM)
+    )
     val prevScope = ssc.sparkContext.getLocalProperty(scopeKey)
     val prevScopeNoOverride = ssc.sparkContext.getLocalProperty(scopeNoOverrideKey)
 
     try {
-      ssc.sparkContext.setCallSite(creationSite)
+      if (displayInnerRDDOps) {
+        // Unset the short form call site, so that generated RDDs get their own
+        ssc.sparkContext.setLocalProperty(CallSite.SHORT_FORM, null)
+        ssc.sparkContext.setLocalProperty(CallSite.LONG_FORM, null)
+      } else {
+        // Set the callsite, so that the generated RDDs get the DStream's call site and
+        // the internal RDD call sites do not get displayed
+        ssc.sparkContext.setCallSite(creationSite)
+      }
+
       // Use the DStream's base scope for this RDD so we can (1) preserve the higher level
       // DStream operation name, and (2) share this scope with other DStreams created in
the
       // same operation. Disallow nesting so that low-level Spark primitives do not show
up.
       // TODO: merge callsites with scopes so we can just reuse the code there
       makeScope(time).foreach { s =>
         ssc.sparkContext.setLocalProperty(scopeKey, s.toJson)
-        ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
+        if (displayInnerRDDOps) {
+          // Allow inner RDDs to add inner scopes
+          ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, null)
+        } else {
+          // Do not allow inner RDDs to override the scope set by DStream
+          ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
+        }
       }
 
       body
@@ -628,7 +653,7 @@ abstract class DStream[T: ClassTag] (
    */
   def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
     val cleanedF = context.sparkContext.clean(foreachFunc, false)
-    this.foreachRDD((r: RDD[T], t: Time) => cleanedF(r))
+    foreachRDD((r: RDD[T], t: Time) => cleanedF(r), displayInnerRDDOps = true)
   }
 
   /**
@@ -639,7 +664,23 @@ abstract class DStream[T: ClassTag] (
     // because the DStream is reachable from the outer object here, and because
     // DStreams can't be serialized with closures, we can't proactively check
     // it for serializability and so we pass the optional false to SparkContext.clean
-    new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
+    foreachRDD(foreachFunc, displayInnerRDDOps = true)
+  }
+
+  /**
+   * Apply a function to each RDD in this DStream. This is an output operator, so
+   * 'this' DStream will be registered as an output stream and therefore materialized.
+   * @param foreachFunc foreachRDD function
+   * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
+   *                           in the `foreachFunc` to be displayed in the UI. If `false`,
then
+   *                           only the scopes and callsites of `foreachRDD` will override
those
+   *                           of the RDDs on the display.
+   */
+  private def foreachRDD(
+      foreachFunc: (RDD[T], Time) => Unit,
+      displayInnerRDDOps: Boolean): Unit = {
+    new ForEachDStream(this,
+      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
   }
 
   /**
@@ -730,7 +771,7 @@ abstract class DStream[T: ClassTag] (
         // scalastyle:on println
       }
     }
-    new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
+    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
   }
 
   /**
@@ -900,7 +941,7 @@ abstract class DStream[T: ClassTag] (
       val file = rddToFileName(prefix, suffix, time)
       rdd.saveAsObjectFile(file)
     }
-    this.foreachRDD(saveFunc)
+    this.foreachRDD(saveFunc, displayInnerRDDOps = false)
   }
 
   /**
@@ -913,7 +954,7 @@ abstract class DStream[T: ClassTag] (
       val file = rddToFileName(prefix, suffix, time)
       rdd.saveAsTextFile(file)
     }
-    this.foreachRDD(saveFunc)
+    this.foreachRDD(saveFunc, displayInnerRDDOps = false)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index c109cec..4410a99 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -22,10 +22,19 @@ import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.streaming.scheduler.Job
 import scala.reflect.ClassTag
 
+/**
+ * An internal DStream used to represent output operations like DStream.foreachRDD.
+ * @param parent        Parent DStream
+ * @param foreachFunc   Function to apply on each RDD generated by the parent DStream
+ * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
+ *                           by `foreachFunc` will be displayed in the UI; only the scope
and
+ *                           callsite of `DStream.foreachRDD` will be displayed.
+ */
 private[streaming]
 class ForEachDStream[T: ClassTag] (
     parent: DStream[T],
-    foreachFunc: (RDD[T], Time) => Unit
+    foreachFunc: (RDD[T], Time) => Unit,
+    displayInnerRDDOps: Boolean
   ) extends DStream[Unit](parent.ssc) {
 
   override def dependencies: List[DStream[_]] = List(parent)
@@ -37,8 +46,7 @@ class ForEachDStream[T: ClassTag] (
   override def generateJob(time: Time): Option[Job] = {
     parent.getOrCompute(time) match {
       case Some(rdd) =>
-        val jobFunc = () => createRDDWithLocalProperties(time) {
-          ssc.sparkContext.setCallSite(creationSite)
+        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
           foreachFunc(rdd, time)
         }
         Some(new Job(time, jobFunc))

http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 5eabdf6..080bc87 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -51,4 +51,17 @@ class TransformedDStream[U: ClassTag] (
     }
     Some(transformedRDD)
   }
+
+  /**
+   * Wrap a body of code such that the call site and operation scope
+   * information are passed to the RDDs created in this body properly.
+   * This has been overriden to make sure that `displayInnerRDDOps` is always `true`, that
is,
+   * the inner scopes and callsites of RDDs generated in `DStream.transform` are always
+   * displayed in the UI.
+   */
+  override protected[streaming] def createRDDWithLocalProperties[U](
+      time: Time,
+      displayInnerRDDOps: Boolean)(body: => U): U = {
+    super.createRDDWithLocalProperties(time, displayInnerRDDOps = true)(body)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
index 8844c9d..bc223e6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.streaming
 
+import scala.collection.mutable.ArrayBuffer
+
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
 
-import org.apache.spark.{SparkContext, SparkFunSuite}
-import org.apache.spark.rdd.RDDOperationScope
+import org.apache.spark.rdd.{RDD, RDDOperationScope}
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.ui.UIUtils
+import org.apache.spark.util.ManualClock
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 
 /**
  * Tests whether scope information is passed from DStream operations to RDDs correctly.
@@ -32,7 +35,9 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
   private val batchDuration: Duration = Seconds(1)
 
   override def beforeAll(): Unit = {
-    ssc = new StreamingContext(new SparkContext("local", "test"), batchDuration)
+    val conf = new SparkConf().setMaster("local").setAppName("test")
+    conf.set("spark.streaming.clock", classOf[ManualClock].getName())
+    ssc = new StreamingContext(new SparkContext(conf), batchDuration)
   }
 
   override def afterAll(): Unit = {
@@ -103,6 +108,8 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with
BeforeAnd
 
   test("scoping nested operations") {
     val inputStream = new DummyInputDStream(ssc)
+    // countByKeyAndWindow internally uses reduceByKeyAndWindow, but only countByKeyAndWindow
+    // should appear in scope
     val countStream = inputStream.countByWindow(Seconds(10), Seconds(1))
     countStream.initialize(Time(0))
 
@@ -137,6 +144,57 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with
BeforeAnd
     testStream(countStream)
   }
 
+  test("transform should allow RDD operations to be captured in scopes") {
+    val inputStream = new DummyInputDStream(ssc)
+    val transformedStream = inputStream.transform { _.map { _ -> 1}.reduceByKey(_ + _)
}
+    transformedStream.initialize(Time(0))
+
+    val transformScopeBase = transformedStream.baseScope.map(RDDOperationScope.fromJson)
+    val transformScope1 = transformedStream.getOrCompute(Time(1000)).get.scope
+    val transformScope2 = transformedStream.getOrCompute(Time(2000)).get.scope
+    val transformScope3 = transformedStream.getOrCompute(Time(3000)).get.scope
+
+    // Assert that all children RDDs inherit the DStream operation name correctly
+    assertDefined(transformScopeBase, transformScope1, transformScope2, transformScope3)
+    assert(transformScopeBase.get.name === "transform")
+    assertNestedScopeCorrect(transformScope1.get, 1000)
+    assertNestedScopeCorrect(transformScope2.get, 2000)
+    assertNestedScopeCorrect(transformScope3.get, 3000)
+
+    def assertNestedScopeCorrect(rddScope: RDDOperationScope, batchTime: Long): Unit = {
+      assert(rddScope.name === "reduceByKey")
+      assert(rddScope.parent.isDefined)
+      assertScopeCorrect(transformScopeBase.get, rddScope.parent.get, batchTime)
+    }
+  }
+
+  test("foreachRDD should allow RDD operations to be captured in scope") {
+    val inputStream = new DummyInputDStream(ssc)
+    val generatedRDDs = new ArrayBuffer[RDD[(Int, Int)]]
+    inputStream.foreachRDD { rdd =>
+      generatedRDDs += rdd.map { _ -> 1}.reduceByKey(_ + _)
+    }
+    val batchCounter = new BatchCounter(ssc)
+    ssc.start()
+    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+    clock.advance(3000)
+    batchCounter.waitUntilBatchesCompleted(3, 10000)
+    assert(generatedRDDs.size === 3)
+
+    val foreachBaseScope =
+      ssc.graph.getOutputStreams().head.baseScope.map(RDDOperationScope.fromJson)
+    assertDefined(foreachBaseScope)
+    assert(foreachBaseScope.get.name === "foreachRDD")
+
+    val rddScopes = generatedRDDs.map { _.scope }
+    assertDefined(rddScopes: _*)
+    rddScopes.zipWithIndex.foreach { case (rddScope, idx) =>
+      assert(rddScope.get.name === "reduceByKey")
+      assert(rddScope.get.parent.isDefined)
+      assertScopeCorrect(foreachBaseScope.get, rddScope.get.parent.get, (idx + 1) * 1000)
+    }
+  }
+
   /** Assert that the RDD operation scope properties are not set in our SparkContext. */
   private def assertPropertiesNotSet(): Unit = {
     assert(ssc != null)
@@ -149,19 +207,12 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with
BeforeAnd
       baseScope: RDDOperationScope,
       rddScope: RDDOperationScope,
       batchTime: Long): Unit = {
-    assertScopeCorrect(baseScope.id, baseScope.name, rddScope, batchTime)
-  }
-
-  /** Assert that the given RDD scope inherits the base name and ID correctly. */
-  private def assertScopeCorrect(
-      baseScopeId: String,
-      baseScopeName: String,
-      rddScope: RDDOperationScope,
-      batchTime: Long): Unit = {
+    val (baseScopeId, baseScopeName) = (baseScope.id, baseScope.name)
     val formattedBatchTime = UIUtils.formatBatchTime(
       batchTime, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
     assert(rddScope.id === s"${baseScopeId}_$batchTime")
     assert(rddScope.name.replaceAll("\\n", " ") === s"$baseScopeName @ $formattedBatchTime")
+    assert(rddScope.parent.isEmpty)  // There should not be any higher scope
   }
 
   /** Assert that all the specified options are defined. */

http://git-wip-us.apache.org/repos/asf/spark/blob/6600786d/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 0d58a7b..a45c92d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -98,7 +98,7 @@ class TestOutputStream[T: ClassTag](
   ) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.collect()
     output += collected
-  }) {
+  }, false) {
 
   // This is to clear the output buffer every it is read from a checkpoint
   @throws(classOf[IOException])
@@ -122,7 +122,7 @@ class TestOutputStreamWithPartitions[T: ClassTag](
   extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.glom().collect().map(_.toSeq)
     output += collected
-  }) {
+  }, false) {
 
   // This is to clear the output buffer every it is read from a checkpoint
   @throws(classOf[IOException])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message