spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject [1/3] spark git commit: [SPARK-7501] [STREAMING] DAG visualization: show DStream operations
Date Mon, 18 May 2015 21:33:49 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 ba502ab20 -> a475cbc97


http://git-wip-us.apache.org/repos/asf/spark/blob/a475cbc9/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index af276e7..f78fbaf 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -678,7 +678,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    *
    * Note: Return statements are NOT allowed in the given body.
    */
-  private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
+  private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)
 
   // Methods for creating RDDs
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a475cbc9/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
index 2725826..6b09dfa 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{Logging, SparkContext}
 
 /**
  * A general, named code block representing an operation that instantiates RDDs.
@@ -43,9 +43,8 @@ import org.apache.spark.SparkContext
 @JsonPropertyOrder(Array("id", "name", "parent"))
 private[spark] class RDDOperationScope(
     val name: String,
-    val parent: Option[RDDOperationScope] = None) {
-
-  val id: Int = RDDOperationScope.nextScopeId()
+    val parent: Option[RDDOperationScope] = None,
+    val id: String = RDDOperationScope.nextScopeId().toString) {
 
   def toJson: String = {
     RDDOperationScope.jsonMapper.writeValueAsString(this)
@@ -75,7 +74,7 @@ private[spark] class RDDOperationScope(
  * A collection of utility methods to construct a hierarchical representation of RDD scopes.
  * An RDD scope tracks the series of operations that created a given RDD.
  */
-private[spark] object RDDOperationScope {
+private[spark] object RDDOperationScope extends Logging {
   private val jsonMapper = new ObjectMapper().registerModule(DefaultScalaModule)
   private val scopeCounter = new AtomicInteger(0)
 
@@ -88,14 +87,25 @@ private[spark] object RDDOperationScope {
 
   /**
    * Execute the given body such that all RDDs created in this body will have the same scope.
-   * The name of the scope will be the name of the method that immediately encloses this one.
+   * The name of the scope will be the first method name in the stack trace that is not the
+   * same as this method's.
    *
    * Note: Return statements are NOT allowed in body.
    */
   private[spark] def withScope[T](
       sc: SparkContext,
       allowNesting: Boolean = false)(body: => T): T = {
-    val callerMethodName = Thread.currentThread.getStackTrace()(3).getMethodName
+    val stackTrace = Thread.currentThread.getStackTrace().tail // ignore "Thread#getStackTrace"
+    val ourMethodName = stackTrace(1).getMethodName // i.e. withScope
+    // Climb upwards to find the first method that's called something different
+    val callerMethodName = stackTrace
+      .find(_.getMethodName != ourMethodName)
+      .map(_.getMethodName)
+      .getOrElse {
+        // Log a warning just in case, but this should almost certainly never happen
+        logWarning("No valid method name for this RDD operation scope!")
+        "N/A"
+      }
     withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a475cbc9/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index 33a7303..d6a5085 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -116,8 +116,8 @@ private[ui] object RDDOperationGraph extends Logging {
         // which may be nested inside of other clusters
         val rddScopes = rdd.scope.map { scope => scope.getAllScopes }.getOrElse(Seq.empty)
         val rddClusters = rddScopes.map { scope =>
-          val clusterId = scope.name + "_" + scope.id
-          val clusterName = scope.name
+          val clusterId = scope.id
+          val clusterName = scope.name.replaceAll("\\n", "\\\\n")
           clusters.getOrElseUpdate(clusterId, new RDDOperationCluster(clusterId, clusterName))
         }
         // Build the cluster hierarchy for this RDD
@@ -177,7 +177,7 @@ private[ui] object RDDOperationGraph extends Logging {
 
   /** Return the dot representation of a node in an RDDOperationGraph. */
   private def makeDotNode(node: RDDOperationNode): String = {
-    s"""${node.id} [label="${node.name} (${node.id})"]"""
+    s"""${node.id} [label="${node.name} [${node.id}]"]"""
   }
 
   /** Return the dot representation of a subgraph in an RDDOperationGraph. */

http://git-wip-us.apache.org/repos/asf/spark/blob/a475cbc9/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala
index db465a6..4434ed8 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala
@@ -22,13 +22,13 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
 import org.apache.spark.{TaskContext, Partition, SparkContext}
 
 /**
- *
+ * Tests whether scopes are passed from the RDD operation to the RDDs correctly.
  */
 class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
   private var sc: SparkContext = null
   private val scope1 = new RDDOperationScope("scope1")
-  private val scope2 = new RDDOperationScope("scope2", parent = Some(scope1))
-  private val scope3 = new RDDOperationScope("scope3", parent = Some(scope2))
+  private val scope2 = new RDDOperationScope("scope2", Some(scope1))
+  private val scope3 = new RDDOperationScope("scope3", Some(scope2))
 
   before {
     sc = new SparkContext("local", "test")
@@ -48,9 +48,9 @@ class RDDOperationScopeSuite extends FunSuite with BeforeAndAfter {
     val scope1Json = scope1.toJson
     val scope2Json = scope2.toJson
     val scope3Json = scope3.toJson
-    assert(scope1Json === s"""{"id":${scope1.id},"name":"scope1"}""")
-    assert(scope2Json === s"""{"id":${scope2.id},"name":"scope2","parent":$scope1Json}""")
-    assert(scope3Json === s"""{"id":${scope3.id},"name":"scope3","parent":$scope2Json}""")
+    assert(scope1Json === s"""{"id":"${scope1.id}","name":"scope1"}""")
+    assert(scope2Json === s"""{"id":"${scope2.id}","name":"scope2","parent":$scope1Json}""")
+    assert(scope3Json === s"""{"id":"${scope3.id}","name":"scope3","parent":$scope2Json}""")
     assert(RDDOperationScope.fromJson(scope1Json) === scope1)
     assert(RDDOperationScope.fromJson(scope2Json) === scope2)
     assert(RDDOperationScope.fromJson(scope3Json) === scope3)

http://git-wip-us.apache.org/repos/asf/spark/blob/a475cbc9/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 6715aed..060c2f2 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -65,6 +65,9 @@ class DirectKafkaInputDStream[
   val maxRetries = context.sparkContext.getConf.getInt(
     "spark.streaming.kafka.maxRetries", 1)
 
+  // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
+  private[streaming] override def name: String = s"Kafka direct stream [$id]"
+
   protected[streaming] override val checkpointData =
     new DirectKafkaInputDStreamCheckpointData
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a475cbc9/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index d7cf500..8be2707 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -189,7 +189,7 @@ object KafkaUtils {
       sc: SparkContext,
       kafkaParams: Map[String, String],
       offsetRanges: Array[OffsetRange]
-    ): RDD[(K, V)] = {
+    ): RDD[(K, V)] = sc.withScope {
     val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
     val leaders = leadersForRanges(kafkaParams, offsetRanges)
     new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
@@ -224,7 +224,7 @@ object KafkaUtils {
       offsetRanges: Array[OffsetRange],
       leaders: Map[TopicAndPartition, Broker],
       messageHandler: MessageAndMetadata[K, V] => R
-    ): RDD[R] = {
+    ): RDD[R] = sc.withScope {
     val leaderMap = if (leaders.isEmpty) {
       leadersForRanges(kafkaParams, offsetRanges)
     } else {
@@ -233,7 +233,8 @@ object KafkaUtils {
         case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
       }.toMap
     }
-    new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
+    val cleanedHandler = sc.clean(messageHandler)
+    new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, cleanedHandler)
   }
 
   /**
@@ -256,7 +257,7 @@ object KafkaUtils {
       valueDecoderClass: Class[VD],
       kafkaParams: JMap[String, String],
       offsetRanges: Array[OffsetRange]
-    ): JavaPairRDD[K, V] = {
+    ): JavaPairRDD[K, V] = jsc.sc.withScope {
     implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
     implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
     implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
@@ -294,7 +295,7 @@ object KafkaUtils {
       offsetRanges: Array[OffsetRange],
       leaders: JMap[TopicAndPartition, Broker],
       messageHandler: JFunction[MessageAndMetadata[K, V], R]
-    ): JavaRDD[R] = {
+    ): JavaRDD[R] = jsc.sc.withScope {
     implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
     implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
     implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
@@ -348,8 +349,9 @@ object KafkaUtils {
       fromOffsets: Map[TopicAndPartition, Long],
       messageHandler: MessageAndMetadata[K, V] => R
   ): InputDStream[R] = {
+    val cleanedHandler = ssc.sc.clean(messageHandler)
     new DirectKafkaInputDStream[K, V, KD, VD, R](
-      ssc, kafkaParams, fromOffsets, messageHandler)
+      ssc, kafkaParams, fromOffsets, cleanedHandler)
   }
 
   /**
@@ -469,11 +471,12 @@ object KafkaUtils {
     implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
     implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
     implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+    val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
     createDirectStream[K, V, KD, VD, R](
       jssc.ssc,
       Map(kafkaParams.toSeq: _*),
       Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
-      messageHandler.call _
+      cleanedHandler
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a475cbc9/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 3c0ef94..40f5f18 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -35,7 +35,6 @@ import org.eclipse.paho.client.mqttv3.MqttMessage
 import org.eclipse.paho.client.mqttv3.MqttTopic
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
 
-import org.apache.spark.Logging
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.dstream._
@@ -57,6 +56,8 @@ class MQTTInputDStream(
     storageLevel: StorageLevel
   ) extends ReceiverInputDStream[String](ssc_) {
 
+  private[streaming] override def name: String = s"MQTT stream [$id]"
+
   def getReceiver(): Receiver[String] = {
     new MQTTReceiver(brokerUrl, topic, storageLevel)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a475cbc9/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 1d2ecdd..7f181bc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.spark._
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.input.FixedLengthBinaryInputFormat
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, RDDOperationScope}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContextState._
 import org.apache.spark.streaming.dstream._
@@ -242,14 +242,33 @@ class StreamingContext private[streaming] (
   private[streaming] def getNewInputStreamId() = nextInputStreamId.getAndIncrement()
 
   /**
+   * Execute a block of code in a scope such that all new DStreams created in this body will
+   * be part of the same scope. For more detail, see the comments in `doCompute`.
+   *
+   * Note: Return statements are NOT allowed in the given body.
+   */
+  private[streaming] def withScope[U](body: => U): U = sparkContext.withScope(body)
+
+  /**
+   * Execute a block of code in a scope such that all new DStreams created in this body will
+   * be part of the same scope. For more detail, see the comments in `doCompute`.
+   *
+   * Note: Return statements are NOT allowed in the given body.
+   */
+  private[streaming] def withNamedScope[U](name: String)(body: => U): U = {
+    RDDOperationScope.withScope(sc, name, allowNesting = false, ignoreParent = false)(body)
+  }
+
+  /**
    * Create an input stream with any arbitrary user implemented receiver.
    * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
    * @param receiver Custom implementation of Receiver
    */
   @deprecated("Use receiverStream", "1.0.0")
-  def networkStream[T: ClassTag](
-    receiver: Receiver[T]): ReceiverInputDStream[T] = {
-    receiverStream(receiver)
+  def networkStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
+    withNamedScope("network stream") {
+      receiverStream(receiver)
+    }
   }
 
   /**
@@ -257,9 +276,10 @@ class StreamingContext private[streaming] (
    * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
    * @param receiver Custom implementation of Receiver
    */
-  def receiverStream[T: ClassTag](
-    receiver: Receiver[T]): ReceiverInputDStream[T] = {
-    new PluggableInputDStream[T](this, receiver)
+  def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
+    withNamedScope("receiver stream") {
+      new PluggableInputDStream[T](this, receiver)
+    }
   }
 
   /**
@@ -279,7 +299,7 @@ class StreamingContext private[streaming] (
       name: String,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
       supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
-    ): ReceiverInputDStream[T] = {
+    ): ReceiverInputDStream[T] = withNamedScope("actor stream") {
     receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
   }
 
@@ -296,7 +316,7 @@ class StreamingContext private[streaming] (
       hostname: String,
       port: Int,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): ReceiverInputDStream[String] = {
+    ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
     socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
   }
 
@@ -334,7 +354,7 @@ class StreamingContext private[streaming] (
       hostname: String,
       port: Int,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): ReceiverInputDStream[T] = {
+    ): ReceiverInputDStream[T] = withNamedScope("raw socket stream") {
     new RawInputDStream[T](this, hostname, port, storageLevel)
   }
 
@@ -408,7 +428,7 @@ class StreamingContext private[streaming] (
    * file system. File names starting with . are ignored.
    * @param directory HDFS directory to monitor for new file
    */
-  def textFileStream(directory: String): DStream[String] = {
+  def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
     fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
   }
 
@@ -430,7 +450,7 @@ class StreamingContext private[streaming] (
   @Experimental
   def binaryRecordsStream(
       directory: String,
-      recordLength: Int): DStream[Array[Byte]] = {
+      recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
     val conf = sc_.hadoopConfiguration
     conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
     val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
@@ -477,7 +497,7 @@ class StreamingContext private[streaming] (
   /**
    * Create a unified DStream from multiple DStreams of the same type and same slide duration.
    */
-  def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = {
+  def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = withScope {
     new UnionDStream[T](streams.toArray)
   }
 
@@ -488,7 +508,7 @@ class StreamingContext private[streaming] (
   def transform[T: ClassTag](
       dstreams: Seq[DStream[_]],
       transformFunc: (Seq[RDD[_]], Time) => RDD[T]
-    ): DStream[T] = {
+    ): DStream[T] = withScope {
     new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a475cbc9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 64de752..5977481 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -25,12 +25,13 @@ import scala.language.implicitConversions
 import scala.reflect.ClassTag
 import scala.util.matching.Regex
 
-import org.apache.spark.{Logging, SparkException}
-import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD}
+import org.apache.spark.{Logging, SparkContext, SparkException}
+import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext.rddToFileName
 import org.apache.spark.streaming.scheduler.Job
+import org.apache.spark.streaming.ui.UIUtils
 import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
 
 /**
@@ -73,7 +74,7 @@ abstract class DStream[T: ClassTag] (
   def dependencies: List[DStream[_]]
 
   /** Method that generates a RDD for the given time */
-  def compute (validTime: Time): Option[RDD[T]]
+  def compute(validTime: Time): Option[RDD[T]]
 
   // =======================================================================
   // Methods and fields available on all DStreams
@@ -111,6 +112,44 @@ abstract class DStream[T: ClassTag] (
   /* Set the creation call site */
   private[streaming] val creationSite = DStream.getCreationSite()
 
+  /**
+   * The base scope associated with the operation that created this DStream.
+   *
+   * This is the medium through which we pass the DStream operation name (e.g. updatedStateByKey)
+   * to the RDDs created by this DStream. Note that we never use this scope directly in RDDs.
+   * Instead, we instantiate a new scope during each call to `compute` based on this one.
+   *
+   * This is not defined if the DStream is created outside of one of the public DStream operations.
+   */
+  protected[streaming] val baseScope: Option[String] = {
+    Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY))
+  }
+
+  /**
+   * Make a scope that groups RDDs created in the same DStream operation in the same batch.
+   *
+   * Each DStream produces many scopes and each scope may be shared by other DStreams created
+   * in the same operation. Separate calls to the same DStream operation create separate scopes.
+   * For instance, `dstream.map(...).map(...)` creates two separate scopes per batch.
+   */
+  private def makeScope(time: Time): Option[RDDOperationScope] = {
+    baseScope.map { bsJson =>
+      val formattedBatchTime = UIUtils.formatBatchTime(
+        time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
+      val bs = RDDOperationScope.fromJson(bsJson)
+      val baseName = bs.name // e.g. countByWindow, "kafka stream [0]"
+      val scopeName =
+        if (baseName.length > 10) {
+          // If the operation name is too long, wrap the line
+          s"$baseName\n@ $formattedBatchTime"
+        } else {
+          s"$baseName @ $formattedBatchTime"
+        }
+      val scopeId = s"${bs.id}_${time.milliseconds}"
+      new RDDOperationScope(scopeName, id = scopeId)
+    }
+  }
+
   /** Persist the RDDs of this DStream with the given storage level */
   def persist(level: StorageLevel): DStream[T] = {
     if (this.isInitialized) {
@@ -295,28 +334,23 @@ abstract class DStream[T: ClassTag] (
    * Get the RDD corresponding to the given time; either retrieve it from cache
    * or compute-and-cache it.
    */
-  private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
+  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
     // If RDD was already generated, then retrieve it from HashMap,
     // or else compute the RDD
     generatedRDDs.get(time).orElse {
       // Compute the RDD if time is valid (e.g. correct time in a sliding window)
       // of RDD generation, else generate nothing.
       if (isTimeValid(time)) {
-        // Set the thread-local property for call sites to this DStream's creation site
-        // such that RDDs generated by compute gets that as their creation site.
-        // Note that this `getOrCompute` may get called from another DStream which may have
-        // set its own call site. So we store its call site in a temporary variable,
-        // set this DStream's creation site, generate RDDs and then restore the previous call site.
-        val prevCallSite = ssc.sparkContext.getCallSite()
-        ssc.sparkContext.setCallSite(creationSite)
-        // Disable checks for existing output directories in jobs launched by the streaming
-        // scheduler, since we may need to write output to an existing directory during checkpoint
-        // recovery; see SPARK-4835 for more details. We need to have this call here because
-        // compute() might cause Spark jobs to be launched.
-        val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
-          compute(time)
+
+        val rddOption = createRDDWithLocalProperties(time) {
+          // Disable checks for existing output directories in jobs launched by the streaming
+          // scheduler, since we may need to write output to an existing directory during checkpoint
+          // recovery; see SPARK-4835 for more details. We need to have this call here because
+          // compute() might cause Spark jobs to be launched.
+          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+            compute(time)
+          }
         }
-        ssc.sparkContext.setCallSite(prevCallSite)
 
         rddOption.foreach { case newRDD =>
           // Register the generated RDD for caching and checkpointing
@@ -338,6 +372,41 @@ abstract class DStream[T: ClassTag] (
   }
 
   /**
+   * Wrap a body of code such that the call site and operation scope
+   * information are passed to the RDDs created in this body properly.
+   */
+  protected def createRDDWithLocalProperties[U](time: Time)(body: => U): U = {
+    val scopeKey = SparkContext.RDD_SCOPE_KEY
+    val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
+    // Pass this DStream's operation scope and creation site information to RDDs through
+    // thread-local properties in our SparkContext. Since this method may be called from another
+    // DStream, we need to temporarily store any old scope and creation site information to
+    // restore them later after setting our own.
+    val prevCallSite = ssc.sparkContext.getCallSite()
+    val prevScope = ssc.sparkContext.getLocalProperty(scopeKey)
+    val prevScopeNoOverride = ssc.sparkContext.getLocalProperty(scopeNoOverrideKey)
+
+    try {
+      ssc.sparkContext.setCallSite(creationSite)
+      // Use the DStream's base scope for this RDD so we can (1) preserve the higher level
+      // DStream operation name, and (2) share this scope with other DStreams created in the
+      // same operation. Disallow nesting so that low-level Spark primitives do not show up.
+      // TODO: merge callsites with scopes so we can just reuse the code there
+      makeScope(time).foreach { s =>
+        ssc.sparkContext.setLocalProperty(scopeKey, s.toJson)
+        ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
+      }
+
+      body
+    } finally {
+      // Restore any state that was modified before returning
+      ssc.sparkContext.setCallSite(prevCallSite)
+      ssc.sparkContext.setLocalProperty(scopeKey, prevScope)
+      ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, prevScopeNoOverride)
+    }
+  }
+
+  /**
    * Generate a SparkStreaming job for the given time. This is an internal method that
    * should not be called directly. This default implementation creates a job
    * that materializes the corresponding RDD. Subclasses of DStream may override this
@@ -456,7 +525,7 @@ abstract class DStream[T: ClassTag] (
   // =======================================================================
 
   /** Return a new DStream by applying a function to all elements of this DStream. */
-  def map[U: ClassTag](mapFunc: T => U): DStream[U] = {
+  def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
     new MappedDStream(this, context.sparkContext.clean(mapFunc))
   }
 
@@ -464,26 +533,31 @@ abstract class DStream[T: ClassTag] (
    * Return a new DStream by applying a function to all elements of this DStream,
    * and then flattening the results
    */
-  def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = {
+  def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope {
     new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
   }
 
   /** Return a new DStream containing only the elements that satisfy a predicate. */
-  def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
+  def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope {
+    new FilteredDStream(this, filterFunc)
+  }
 
   /**
    * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
    * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
    * an array.
    */
-  def glom(): DStream[Array[T]] = new GlommedDStream(this)
-
+  def glom(): DStream[Array[T]] = ssc.withScope {
+    new GlommedDStream(this)
+  }
 
   /**
    * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
    * returned DStream has exactly numPartitions partitions.
    */
-  def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions))
+  def repartition(numPartitions: Int): DStream[T] = ssc.withScope {
+    this.transform(_.repartition(numPartitions))
+  }
 
   /**
    * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
@@ -493,7 +567,7 @@ abstract class DStream[T: ClassTag] (
   def mapPartitions[U: ClassTag](
       mapPartFunc: Iterator[T] => Iterator[U],
       preservePartitioning: Boolean = false
-    ): DStream[U] = {
+    ): DStream[U] = ssc.withScope {
     new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)
   }
 
@@ -501,14 +575,15 @@ abstract class DStream[T: ClassTag] (
    * Return a new DStream in which each RDD has a single element generated by reducing each RDD
    * of this DStream.
    */
-  def reduce(reduceFunc: (T, T) => T): DStream[T] =
+  def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {
     this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
+  }
 
   /**
    * Return a new DStream in which each RDD has a single element generated by counting each RDD
    * of this DStream.
    */
-  def count(): DStream[Long] = {
+  def count(): DStream[Long] = ssc.withScope {
     this.map(_ => (null, 1L))
         .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
         .reduceByKey(_ + _)
@@ -522,15 +597,16 @@ abstract class DStream[T: ClassTag] (
    * `numPartitions` not specified).
    */
   def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
-      : DStream[(T, Long)] =
+      : DStream[(T, Long)] = ssc.withScope {
     this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
+  }
 
   /**
    * Apply a function to each RDD in this DStream. This is an output operator, so
    * 'this' DStream will be registered as an output stream and therefore materialized.
    */
   @deprecated("use foreachRDD", "0.9.0")
-  def foreach(foreachFunc: RDD[T] => Unit): Unit = {
+  def foreach(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
     this.foreachRDD(foreachFunc)
   }
 
@@ -539,7 +615,7 @@ abstract class DStream[T: ClassTag] (
    * 'this' DStream will be registered as an output stream and therefore materialized.
    */
   @deprecated("use foreachRDD", "0.9.0")
-  def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = {
+  def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
     this.foreachRDD(foreachFunc)
   }
 
@@ -547,7 +623,7 @@ abstract class DStream[T: ClassTag] (
    * Apply a function to each RDD in this DStream. This is an output operator, so
    * 'this' DStream will be registered as an output stream and therefore materialized.
    */
-  def foreachRDD(foreachFunc: RDD[T] => Unit) {
+  def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
     this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r))
   }
 
@@ -555,7 +631,7 @@ abstract class DStream[T: ClassTag] (
    * Apply a function to each RDD in this DStream. This is an output operator, so
    * 'this' DStream will be registered as an output stream and therefore materialized.
    */
-  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
+  def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
     // because the DStream is reachable from the outer object here, and because 
     // DStreams can't be serialized with closures, we can't proactively check 
     // it for serializability and so we pass the optional false to SparkContext.clean
@@ -566,7 +642,7 @@ abstract class DStream[T: ClassTag] (
    * Return a new DStream in which each RDD is generated by applying a function
    * on each RDD of 'this' DStream.
    */
-  def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
+  def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
     // because the DStream is reachable from the outer object here, and because 
     // DStreams can't be serialized with closures, we can't proactively check 
     // it for serializability and so we pass the optional false to SparkContext.clean
@@ -578,7 +654,7 @@ abstract class DStream[T: ClassTag] (
    * Return a new DStream in which each RDD is generated by applying a function
    * on each RDD of 'this' DStream.
    */
-  def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
+  def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope {
     // because the DStream is reachable from the outer object here, and because 
     // DStreams can't be serialized with closures, we can't proactively check 
     // it for serializability and so we pass the optional false to SparkContext.clean
@@ -596,7 +672,7 @@ abstract class DStream[T: ClassTag] (
    */
   def transformWith[U: ClassTag, V: ClassTag](
       other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
-    ): DStream[V] = {
+    ): DStream[V] = ssc.withScope {
     // because the DStream is reachable from the outer object here, and because 
     // DStreams can't be serialized with closures, we can't proactively check 
     // it for serializability and so we pass the optional false to SparkContext.clean
@@ -610,7 +686,7 @@ abstract class DStream[T: ClassTag] (
    */
   def transformWith[U: ClassTag, V: ClassTag](
       other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
-    ): DStream[V] = {
+    ): DStream[V] = ssc.withScope {
     // because the DStream is reachable from the outer object here, and because 
     // DStreams can't be serialized with closures, we can't proactively check 
     // it for serializability and so we pass the optional false to SparkContext.clean
@@ -628,7 +704,7 @@ abstract class DStream[T: ClassTag] (
    * Print the first ten elements of each RDD generated in this DStream. This is an output
    * operator, so this DStream will be registered as an output stream and there materialized.
    */
-  def print() {
+  def print(): Unit = ssc.withScope {
     print(10)
   }
 
@@ -636,7 +712,7 @@ abstract class DStream[T: ClassTag] (
    * Print the first num elements of each RDD generated in this DStream. This is an output
    * operator, so this DStream will be registered as an output stream and there materialized.
    */
-  def print(num: Int) {
+  def print(num: Int): Unit = ssc.withScope {
     def foreachFunc: (RDD[T], Time) => Unit = {
       (rdd: RDD[T], time: Time) => {
         val firstNum = rdd.take(num + 1)
@@ -668,7 +744,7 @@ abstract class DStream[T: ClassTag] (
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
    */
-  def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {
+  def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = ssc.withScope {
     new WindowedDStream(this, windowDuration, slideDuration)
   }
 
@@ -686,7 +762,7 @@ abstract class DStream[T: ClassTag] (
       reduceFunc: (T, T) => T,
       windowDuration: Duration,
       slideDuration: Duration
-    ): DStream[T] = {
+    ): DStream[T] = ssc.withScope {
     this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc)
   }
 
@@ -711,7 +787,7 @@ abstract class DStream[T: ClassTag] (
       invReduceFunc: (T, T) => T,
       windowDuration: Duration,
       slideDuration: Duration
-    ): DStream[T] = {
+    ): DStream[T] = ssc.withScope {
       this.map(x => (1, x))
           .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
           .map(_._2)
@@ -727,7 +803,9 @@ abstract class DStream[T: ClassTag] (
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
    */
-  def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
+  def countByWindow(
+      windowDuration: Duration,
+      slideDuration: Duration): DStream[Long] = ssc.withScope {
     this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
   }
 
@@ -748,8 +826,7 @@ abstract class DStream[T: ClassTag] (
       slideDuration: Duration,
       numPartitions: Int = ssc.sc.defaultParallelism)
       (implicit ord: Ordering[T] = null)
-      : DStream[(T, Long)] =
-  {
+      : DStream[(T, Long)] = ssc.withScope {
     this.map(x => (x, 1L)).reduceByKeyAndWindow(
       (x: Long, y: Long) => x + y,
       (x: Long, y: Long) => x - y,
@@ -764,19 +841,21 @@ abstract class DStream[T: ClassTag] (
    * Return a new DStream by unifying data of another DStream with this DStream.
    * @param that Another DStream having the same slideDuration as this DStream.
    */
-  def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
+  def union(that: DStream[T]): DStream[T] = ssc.withScope {
+    new UnionDStream[T](Array(this, that))
+  }
 
   /**
    * Return all the RDDs defined by the Interval object (both end times included)
    */
-  def slice(interval: Interval): Seq[RDD[T]] = {
+  def slice(interval: Interval): Seq[RDD[T]] = ssc.withScope {
     slice(interval.beginTime, interval.endTime)
   }
 
   /**
    * Return all the RDDs between 'fromTime' to 'toTime' (both included)
    */
-  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
+  def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = ssc.withScope {
     if (!isInitialized) {
       throw new SparkException(this + " has not been initialized")
     }
@@ -810,7 +889,7 @@ abstract class DStream[T: ClassTag] (
    * The file name at each batch interval is generated based on `prefix` and
    * `suffix`: "prefix-TIME_IN_MS.suffix".
    */
-  def saveAsObjectFiles(prefix: String, suffix: String = "") {
+  def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
     val saveFunc = (rdd: RDD[T], time: Time) => {
       val file = rddToFileName(prefix, suffix, time)
       rdd.saveAsObjectFile(file)
@@ -823,7 +902,7 @@ abstract class DStream[T: ClassTag] (
    * of elements. The file name at each batch interval is generated based on
    * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
    */
-  def saveAsTextFiles(prefix: String, suffix: String = "") {
+  def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
     val saveFunc = (rdd: RDD[T], time: Time) => {
       val file = rddToFileName(prefix, suffix, time)
       rdd.saveAsTextFile(file)

http://git-wip-us.apache.org/repos/asf/spark/blob/a475cbc9/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 685a32e..c109cec 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -37,7 +37,7 @@ class ForEachDStream[T: ClassTag] (
   override def generateJob(time: Time): Option[Job] = {
     parent.getOrCompute(time) match {
       case Some(rdd) =>
-        val jobFunc = () => {
+        val jobFunc = () => createRDDWithLocalProperties(time) {
           ssc.sparkContext.setCallSite(creationSite)
           foreachFunc(rdd, time)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/a475cbc9/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 9716adb..d58c99a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.{Time, Duration, StreamingContext}
-
 import scala.reflect.ClassTag
 
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDDOperationScope
+import org.apache.spark.streaming.{Time, Duration, StreamingContext}
+import org.apache.spark.util.Utils
+
 /**
  * This is the abstract base class for all input streams. This class provides methods
  * start() and stop() which is called by Spark Streaming system to start and stop receiving data.
@@ -44,10 +47,31 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
   /** This is an unique identifier for the input stream. */
   val id = ssc.getNewInputStreamId()
 
+  /** A human-readable name of this InputDStream */
+  private[streaming] def name: String = {
+    // e.g. FlumePollingDStream -> "Flume polling stream"
+    val newName = Utils.getFormattedClassName(this)
+      .replaceAll("InputDStream", "Stream")
+      .split("(?=[A-Z])")
+      .filter(_.nonEmpty)
+      .mkString(" ")
+      .toLowerCase
+      .capitalize
+    s"$newName [$id]"
+  }
+
   /**
-   * The name of this InputDStream. By default, it's the class name with its id.
+   * The base scope associated with the operation that created this DStream.
+   *
+   * For InputDStreams, we use the name of this DStream as the scope name.
+   * If an outer scope is given, we assume that it includes an alternative name for this stream.
    */
-  private[streaming] def name: String = s"${getClass.getSimpleName}-$id"
+  protected[streaming] override val baseScope: Option[String] = {
+    val scopeName = Option(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY))
+      .map { json => RDDOperationScope.fromJson(json).name + s" [$id]" }
+      .getOrElse(name.toLowerCase)
+    Some(new RDDOperationScope(scopeName).toJson)
+  }
 
   /**
    * Checks whether the 'time' is valid wrt slideDuration for generating RDD.

http://git-wip-us.apache.org/repos/asf/spark/blob/a475cbc9/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 8a58571..884a8e8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -46,7 +46,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
    * generate the RDDs with Spark's default number of partitions.
    */
-  def groupByKey(): DStream[(K, Iterable[V])] = {
+  def groupByKey(): DStream[(K, Iterable[V])] = ssc.withScope {
     groupByKey(defaultPartitioner())
   }
 
@@ -54,7 +54,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
    * generate the RDDs with `numPartitions` partitions.
    */
-  def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = {
+  def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = ssc.withScope {
     groupByKey(defaultPartitioner(numPartitions))
   }
 
@@ -62,7 +62,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * Return a new DStream by applying `groupByKey` on each RDD. The supplied
    * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
    */
-  def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = {
+  def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = ssc.withScope {
     val createCombiner = (v: V) => ArrayBuffer[V](v)
     val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
     val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
@@ -75,7 +75,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
    * with Spark's default number of partitions.
    */
-  def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
+  def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope {
     reduceByKey(reduceFunc, defaultPartitioner())
   }
 
@@ -84,7 +84,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
    * with `numPartitions` partitions.
    */
-  def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
+  def reduceByKey(
+      reduceFunc: (V, V) => V,
+      numPartitions: Int): DStream[(K, V)] = ssc.withScope {
     reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
   }
 
@@ -93,7 +95,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control
    * the partitioning of each RDD.
    */
-  def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
+  def reduceByKey(
+      reduceFunc: (V, V) => V,
+      partitioner: Partitioner): DStream[(K, V)] = ssc.withScope {
     val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
     combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
   }
@@ -104,11 +108,11 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information.
    */
   def combineByKey[C: ClassTag](
-    createCombiner: V => C,
-    mergeValue: (C, V) => C,
-    mergeCombiner: (C, C) => C,
-    partitioner: Partitioner,
-    mapSideCombine: Boolean = true): DStream[(K, C)] = {
+      createCombiner: V => C,
+      mergeValue: (C, V) => C,
+      mergeCombiner: (C, C) => C,
+      partitioner: Partitioner,
+      mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
     new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner,
       mapSideCombine)
   }
@@ -121,7 +125,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    */
-  def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = {
+  def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = ssc.withScope {
     groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner())
   }
 
@@ -136,8 +140,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    *                       DStream's batching interval
    */
   def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
-      : DStream[(K, Iterable[V])] =
-  {
+      : DStream[(K, Iterable[V])] = ssc.withScope {
     groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
   }
 
@@ -157,7 +160,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       windowDuration: Duration,
       slideDuration: Duration,
       numPartitions: Int
-    ): DStream[(K, Iterable[V])] = {
+    ): DStream[(K, Iterable[V])] = ssc.withScope {
     groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
   }
 
@@ -176,7 +179,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       windowDuration: Duration,
       slideDuration: Duration,
       partitioner: Partitioner
-    ): DStream[(K, Iterable[V])] = {
+    ): DStream[(K, Iterable[V])] = ssc.withScope {
     val createCombiner = (v: Iterable[V]) => new ArrayBuffer[V] ++= v
     val mergeValue = (buf: ArrayBuffer[V], v: Iterable[V]) => buf ++= v
     val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
@@ -198,7 +201,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
   def reduceByKeyAndWindow(
       reduceFunc: (V, V) => V,
       windowDuration: Duration
-    ): DStream[(K, V)] = {
+    ): DStream[(K, V)] = ssc.withScope {
     reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
   }
 
@@ -217,7 +220,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       reduceFunc: (V, V) => V,
       windowDuration: Duration,
       slideDuration: Duration
-    ): DStream[(K, V)] = {
+    ): DStream[(K, V)] = ssc.withScope {
     reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
   }
 
@@ -238,7 +241,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       windowDuration: Duration,
       slideDuration: Duration,
       numPartitions: Int
-    ): DStream[(K, V)] = {
+    ): DStream[(K, V)] = ssc.withScope {
     reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration,
       defaultPartitioner(numPartitions))
   }
@@ -260,7 +263,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       windowDuration: Duration,
       slideDuration: Duration,
       partitioner: Partitioner
-    ): DStream[(K, V)] = {
+    ): DStream[(K, V)] = ssc.withScope {
     val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
     self.reduceByKey(cleanedReduceFunc, partitioner)
         .window(windowDuration, slideDuration)
@@ -294,8 +297,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       slideDuration: Duration = self.slideDuration,
       numPartitions: Int = ssc.sc.defaultParallelism,
       filterFunc: ((K, V)) => Boolean = null
-    ): DStream[(K, V)] = {
-
+    ): DStream[(K, V)] = ssc.withScope {
     reduceByKeyAndWindow(
       reduceFunc, invReduceFunc, windowDuration,
       slideDuration, defaultPartitioner(numPartitions), filterFunc
@@ -328,7 +330,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       slideDuration: Duration,
       partitioner: Partitioner,
       filterFunc: ((K, V)) => Boolean
-    ): DStream[(K, V)] = {
+    ): DStream[(K, V)] = ssc.withScope {
 
     val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
     val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
@@ -349,7 +351,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    */
   def updateStateByKey[S: ClassTag](
       updateFunc: (Seq[V], Option[S]) => Option[S]
-    ): DStream[(K, S)] = {
+    ): DStream[(K, S)] = ssc.withScope {
     updateStateByKey(updateFunc, defaultPartitioner())
   }
 
@@ -365,7 +367,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
   def updateStateByKey[S: ClassTag](
       updateFunc: (Seq[V], Option[S]) => Option[S],
       numPartitions: Int
-    ): DStream[(K, S)] = {
+    ): DStream[(K, S)] = ssc.withScope {
     updateStateByKey(updateFunc, defaultPartitioner(numPartitions))
   }
 
@@ -382,7 +384,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
   def updateStateByKey[S: ClassTag](
       updateFunc: (Seq[V], Option[S]) => Option[S],
       partitioner: Partitioner
-    ): DStream[(K, S)] = {
+    ): DStream[(K, S)] = ssc.withScope {
     val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
       iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
     }
@@ -406,7 +408,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
       partitioner: Partitioner,
       rememberPartitioner: Boolean
-    ): DStream[(K, S)] = {
+    ): DStream[(K, S)] = ssc.withScope {
      new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)
   }
 
@@ -425,7 +427,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       updateFunc: (Seq[V], Option[S]) => Option[S],
       partitioner: Partitioner,
       initialRDD: RDD[(K, S)]
-    ): DStream[(K, S)] = {
+    ): DStream[(K, S)] = ssc.withScope {
     val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
       iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
     }
@@ -451,7 +453,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       partitioner: Partitioner,
       rememberPartitioner: Boolean,
       initialRDD: RDD[(K, S)]
-    ): DStream[(K, S)] = {
+    ): DStream[(K, S)] = ssc.withScope {
      new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,
        rememberPartitioner, Some(initialRDD))
   }
@@ -460,7 +462,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * Return a new DStream by applying a map function to the value of each key-value pairs in
    * 'this' DStream without changing the key.
    */
-  def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = {
+  def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = ssc.withScope {
     new MapValuedDStream[K, V, U](self, mapValuesFunc)
   }
 
@@ -470,7 +472,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    */
   def flatMapValues[U: ClassTag](
       flatMapValuesFunc: V => TraversableOnce[U]
-    ): DStream[(K, U)] = {
+    ): DStream[(K, U)] = ssc.withScope {
     new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
   }
 
@@ -479,7 +481,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * Hash partitioning is used to generate the RDDs with Spark's default number
    * of partitions.
    */
-  def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = {
+  def cogroup[W: ClassTag](
+      other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope {
     cogroup(other, defaultPartitioner())
   }
 
@@ -487,8 +490,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
    * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
    */
-  def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int)
-  : DStream[(K, (Iterable[V], Iterable[W]))] = {
+  def cogroup[W: ClassTag](
+      other: DStream[(K, W)],
+      numPartitions: Int): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope {
     cogroup(other, defaultPartitioner(numPartitions))
   }
 
@@ -499,7 +503,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
   def cogroup[W: ClassTag](
       other: DStream[(K, W)],
       partitioner: Partitioner
-    ): DStream[(K, (Iterable[V], Iterable[W]))] = {
+    ): DStream[(K, (Iterable[V], Iterable[W]))] = ssc.withScope {
     self.transformWith(
       other,
       (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner)
@@ -510,7 +514,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
    * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
    */
-  def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
+  def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = ssc.withScope {
     join[W](other, defaultPartitioner())
   }
 
@@ -518,7 +522,9 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
    * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
    */
-  def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
+  def join[W: ClassTag](
+      other: DStream[(K, W)],
+      numPartitions: Int): DStream[(K, (V, W))] = ssc.withScope {
     join[W](other, defaultPartitioner(numPartitions))
   }
 
@@ -529,7 +535,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
   def join[W: ClassTag](
       other: DStream[(K, W)],
       partitioner: Partitioner
-    ): DStream[(K, (V, W))] = {
+    ): DStream[(K, (V, W))] = ssc.withScope {
     self.transformWith(
       other,
       (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
@@ -541,7 +547,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
    * number of partitions.
    */
-  def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
+  def leftOuterJoin[W: ClassTag](
+      other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = ssc.withScope {
     leftOuterJoin[W](other, defaultPartitioner())
   }
 
@@ -553,7 +560,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
   def leftOuterJoin[W: ClassTag](
       other: DStream[(K, W)],
       numPartitions: Int
-    ): DStream[(K, (V, Option[W]))] = {
+    ): DStream[(K, (V, Option[W]))] = ssc.withScope {
     leftOuterJoin[W](other, defaultPartitioner(numPartitions))
   }
 
@@ -565,7 +572,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
   def leftOuterJoin[W: ClassTag](
       other: DStream[(K, W)],
       partitioner: Partitioner
-    ): DStream[(K, (V, Option[W]))] = {
+    ): DStream[(K, (V, Option[W]))] = ssc.withScope {
     self.transformWith(
       other,
       (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner)
@@ -577,7 +584,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
    * number of partitions.
    */
-  def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
+  def rightOuterJoin[W: ClassTag](
+      other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = ssc.withScope {
     rightOuterJoin[W](other, defaultPartitioner())
   }
 
@@ -589,7 +597,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
   def rightOuterJoin[W: ClassTag](
       other: DStream[(K, W)],
       numPartitions: Int
-    ): DStream[(K, (Option[V], W))] = {
+    ): DStream[(K, (Option[V], W))] = ssc.withScope {
     rightOuterJoin[W](other, defaultPartitioner(numPartitions))
   }
 
@@ -601,7 +609,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
   def rightOuterJoin[W: ClassTag](
       other: DStream[(K, W)],
       partitioner: Partitioner
-    ): DStream[(K, (Option[V], W))] = {
+    ): DStream[(K, (Option[V], W))] = ssc.withScope {
     self.transformWith(
       other,
       (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner)
@@ -613,7 +621,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
    * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
    * number of partitions.
    */
-  def fullOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = {
+  def fullOuterJoin[W: ClassTag](
+      other: DStream[(K, W)]): DStream[(K, (Option[V], Option[W]))] = ssc.withScope {
     fullOuterJoin[W](other, defaultPartitioner())
   }
 
@@ -625,7 +634,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
   def fullOuterJoin[W: ClassTag](
       other: DStream[(K, W)],
       numPartitions: Int
-    ): DStream[(K, (Option[V], Option[W]))] = {
+    ): DStream[(K, (Option[V], Option[W]))] = ssc.withScope {
     fullOuterJoin[W](other, defaultPartitioner(numPartitions))
   }
 
@@ -637,7 +646,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
   def fullOuterJoin[W: ClassTag](
       other: DStream[(K, W)],
       partitioner: Partitioner
-    ): DStream[(K, (Option[V], Option[W]))] = {
+    ): DStream[(K, (Option[V], Option[W]))] = ssc.withScope {
     self.transformWith(
       other,
       (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.fullOuterJoin(rdd2, partitioner)
@@ -651,7 +660,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
   def saveAsHadoopFiles[F <: OutputFormat[K, V]](
       prefix: String,
       suffix: String
-    )(implicit fm: ClassTag[F]) {
+    )(implicit fm: ClassTag[F]): Unit = ssc.withScope {
     saveAsHadoopFiles(prefix, suffix, keyClass, valueClass,
       fm.runtimeClass.asInstanceOf[Class[F]])
   }
@@ -667,7 +676,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       valueClass: Class[_],
       outputFormatClass: Class[_ <: OutputFormat[_, _]],
       conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
-    ) {
+    ): Unit = ssc.withScope {
     // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
     val serializableConf = new SerializableWritable(conf)
     val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
@@ -684,7 +693,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
   def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
       prefix: String,
       suffix: String
-    )(implicit fm: ClassTag[F])  {
+    )(implicit fm: ClassTag[F]): Unit = ssc.withScope {
     saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass,
       fm.runtimeClass.asInstanceOf[Class[F]])
   }
@@ -700,7 +709,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       valueClass: Class[_],
       outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
       conf: Configuration = ssc.sparkContext.hadoopConfiguration
-    ) {
+    ): Unit = ssc.withScope {
     // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
     val serializableConf = new SerializableWritable(conf)
     val saveFunc = (rdd: RDD[(K, V)], time: Time) => {

http://git-wip-us.apache.org/repos/asf/spark/blob/a475cbc9/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
new file mode 100644
index 0000000..3929331
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.{RDD, RDDOperationScope}
+import org.apache.spark.streaming.dstream.{DStream, InputDStream}
+import org.apache.spark.streaming.ui.UIUtils
+
+/**
+ * Tests whether scope information is passed from DStream operations to RDDs correctly.
+ */
+class DStreamScopeSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll {
+  private var ssc: StreamingContext = null
+  private val batchDuration: Duration = Seconds(1)
+
+  override def beforeAll(): Unit = {
+    ssc = new StreamingContext(new SparkContext("local", "test"), batchDuration)
+  }
+
+  override def afterAll(): Unit = {
+    ssc.stop(stopSparkContext = true)
+  }
+
+  before { assertPropertiesNotSet() }
+  after { assertPropertiesNotSet() }
+
+  test("dstream without scope") {
+    val dummyStream = new DummyDStream(ssc)
+    dummyStream.initialize(Time(0))
+
+    // This DStream is not instantiated in any scope, so all RDDs
+    // created by this stream should similarly not have a scope
+    assert(dummyStream.baseScope === None)
+    assert(dummyStream.getOrCompute(Time(1000)).get.scope === None)
+    assert(dummyStream.getOrCompute(Time(2000)).get.scope === None)
+    assert(dummyStream.getOrCompute(Time(3000)).get.scope === None)
+  }
+
+  test("input dstream without scope") {
+    val inputStream = new DummyInputDStream(ssc)
+    inputStream.initialize(Time(0))
+
+    val baseScope = inputStream.baseScope.map(RDDOperationScope.fromJson)
+    val scope1 = inputStream.getOrCompute(Time(1000)).get.scope
+    val scope2 = inputStream.getOrCompute(Time(2000)).get.scope
+    val scope3 = inputStream.getOrCompute(Time(3000)).get.scope
+
+    // This DStream is not instantiated in any scope, so all RDDs
+    assertDefined(baseScope, scope1, scope2, scope3)
+    assert(baseScope.get.name.startsWith("dummy stream"))
+    assertScopeCorrect(baseScope.get, scope1.get, 1000)
+    assertScopeCorrect(baseScope.get, scope2.get, 2000)
+    assertScopeCorrect(baseScope.get, scope3.get, 3000)
+  }
+
+  test("scoping simple operations") {
+    val inputStream = new DummyInputDStream(ssc)
+    val mappedStream = inputStream.map { i => i + 1 }
+    val filteredStream = mappedStream.filter { i => i % 2 == 0 }
+    filteredStream.initialize(Time(0))
+
+    val mappedScopeBase = mappedStream.baseScope.map(RDDOperationScope.fromJson)
+    val mappedScope1 = mappedStream.getOrCompute(Time(1000)).get.scope
+    val mappedScope2 = mappedStream.getOrCompute(Time(2000)).get.scope
+    val mappedScope3 = mappedStream.getOrCompute(Time(3000)).get.scope
+    val filteredScopeBase = filteredStream.baseScope.map(RDDOperationScope.fromJson)
+    val filteredScope1 = filteredStream.getOrCompute(Time(1000)).get.scope
+    val filteredScope2 = filteredStream.getOrCompute(Time(2000)).get.scope
+    val filteredScope3 = filteredStream.getOrCompute(Time(3000)).get.scope
+
+    // These streams are defined in their respective scopes "map" and "filter", so all
+    // RDDs created by these streams should inherit the IDs and names of their parent
+    // DStream's base scopes
+    assertDefined(mappedScopeBase, mappedScope1, mappedScope2, mappedScope3)
+    assertDefined(filteredScopeBase, filteredScope1, filteredScope2, filteredScope3)
+    assert(mappedScopeBase.get.name === "map")
+    assert(filteredScopeBase.get.name === "filter")
+    assertScopeCorrect(mappedScopeBase.get, mappedScope1.get, 1000)
+    assertScopeCorrect(mappedScopeBase.get, mappedScope2.get, 2000)
+    assertScopeCorrect(mappedScopeBase.get, mappedScope3.get, 3000)
+    assertScopeCorrect(filteredScopeBase.get, filteredScope1.get, 1000)
+    assertScopeCorrect(filteredScopeBase.get, filteredScope2.get, 2000)
+    assertScopeCorrect(filteredScopeBase.get, filteredScope3.get, 3000)
+  }
+
+  test("scoping nested operations") {
+    val inputStream = new DummyInputDStream(ssc)
+    val countStream = inputStream.countByWindow(Seconds(10), Seconds(1))
+    countStream.initialize(Time(0))
+
+    val countScopeBase = countStream.baseScope.map(RDDOperationScope.fromJson)
+    val countScope1 = countStream.getOrCompute(Time(1000)).get.scope
+    val countScope2 = countStream.getOrCompute(Time(2000)).get.scope
+    val countScope3 = countStream.getOrCompute(Time(3000)).get.scope
+
+    // Assert that all children RDDs inherit the DStream operation name correctly
+    assertDefined(countScopeBase, countScope1, countScope2, countScope3)
+    assert(countScopeBase.get.name === "countByWindow")
+    assertScopeCorrect(countScopeBase.get, countScope1.get, 1000)
+    assertScopeCorrect(countScopeBase.get, countScope2.get, 2000)
+    assertScopeCorrect(countScopeBase.get, countScope3.get, 3000)
+
+    // All streams except the input stream should share the same scopes as `countStream`
+    def testStream(stream: DStream[_]): Unit = {
+      if (stream != inputStream) {
+        val myScopeBase = stream.baseScope.map(RDDOperationScope.fromJson)
+        val myScope1 = stream.getOrCompute(Time(1000)).get.scope
+        val myScope2 = stream.getOrCompute(Time(2000)).get.scope
+        val myScope3 = stream.getOrCompute(Time(3000)).get.scope
+        assertDefined(myScopeBase, myScope1, myScope2, myScope3)
+        assert(myScopeBase === countScopeBase)
+        assert(myScope1 === countScope1)
+        assert(myScope2 === countScope2)
+        assert(myScope3 === countScope3)
+        // Climb upwards to test the parent streams
+        stream.dependencies.foreach(testStream)
+      }
+    }
+    testStream(countStream)
+  }
+
+  /** Assert that the RDD operation scope properties are not set in our SparkContext. */
+  private def assertPropertiesNotSet(): Unit = {
+    assert(ssc != null)
+    assert(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY) == null)
+    assert(ssc.sc.getLocalProperty(SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY) == null)
+  }
+
+  /** Assert that the given RDD scope inherits the name and ID of the base scope correctly. */
+  private def assertScopeCorrect(
+      baseScope: RDDOperationScope,
+      rddScope: RDDOperationScope,
+      batchTime: Long): Unit = {
+    assertScopeCorrect(baseScope.id, baseScope.name, rddScope, batchTime)
+  }
+
+  /** Assert that the given RDD scope inherits the base name and ID correctly. */
+  private def assertScopeCorrect(
+      baseScopeId: String,
+      baseScopeName: String,
+      rddScope: RDDOperationScope,
+      batchTime: Long): Unit = {
+    val formattedBatchTime = UIUtils.formatBatchTime(
+      batchTime, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
+    assert(rddScope.id === s"${baseScopeId}_$batchTime")
+    assert(rddScope.name.replaceAll("\\n", " ") === s"$baseScopeName @ $formattedBatchTime")
+  }
+
+  /** Assert that all the specified options are defined. */
+  private def assertDefined[T](options: Option[T]*): Unit = {
+    options.zipWithIndex.foreach { case (o, i) => assert(o.isDefined, s"Option $i was empty!") }
+  }
+
+}
+
+/**
+ * A dummy stream that does absolutely nothing.
+ */
+private class DummyDStream(ssc: StreamingContext) extends DStream[Int](ssc) {
+  override def dependencies: List[DStream[Int]] = List.empty
+  override def slideDuration: Duration = Seconds(1)
+  override def compute(time: Time): Option[RDD[Int]] = Some(ssc.sc.emptyRDD[Int])
+}
+
+/**
+ * A dummy input stream that does absolutely nothing.
+ */
+private class DummyInputDStream(ssc: StreamingContext) extends InputDStream[Int](ssc) {
+  override def start(): Unit = { }
+  override def stop(): Unit = { }
+  override def compute(time: Time): Option[RDD[Int]] = Some(ssc.sc.emptyRDD[Int])
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message