spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-4608][Streaming] Reorganize StreamingContext implicit to improve API convenience
Date Fri, 26 Dec 2014 03:46:30 GMT
Repository: spark
Updated Branches:
  refs/heads/master f205fe477 -> f9ed2b664


[SPARK-4608][Streaming] Reorganize StreamingContext implicit to improve API convenience

There is only one implicit function `toPairDStreamFunctions` in `StreamingContext`. This PR
did similar reorganization like [SPARK-4397](https://issues.apache.org/jira/browse/SPARK-4397).

Compiled the following codes with Spark Streaming 1.1.0 and ran it with this PR. Everything
is fine.
```Scala
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object StreamingApp {

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[2]").setAppName("FileWordCount")
    val ssc = new StreamingContext(conf, Seconds(10))
    val lines = ssc.textFileStream("/some/path")
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
```

Author: zsxwing <zsxwing@gmail.com>

Closes #3464 from zsxwing/SPARK-4608 and squashes the following commits:

aa6d44a [zsxwing] Fix a copy-paste error
f74c190 [zsxwing] Merge branch 'master' into SPARK-4608
e6f9cc9 [zsxwing] Update the docs
27833bb [zsxwing] Remove `import StreamingContext._`
c15162c [zsxwing] Reorganize StreamingContext implicit to improve API convenience


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9ed2b66
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9ed2b66
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9ed2b66

Branch: refs/heads/master
Commit: f9ed2b6641b9df39cee4b98a33cd5a3ddda2d146
Parents: f205fe4
Author: zsxwing <zsxwing@gmail.com>
Authored: Thu Dec 25 19:46:05 2014 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Thu Dec 25 19:46:05 2014 -0800

----------------------------------------------------------------------
 docs/streaming-programming-guide.md             |  4 +--
 .../examples/streaming/KafkaWordCount.scala     |  1 -
 .../examples/streaming/CustomReceiver.scala     |  1 -
 .../examples/streaming/HdfsWordCount.scala      |  1 -
 .../examples/streaming/MQTTWordCount.scala      |  1 -
 .../examples/streaming/NetworkWordCount.scala   |  1 -
 .../spark/examples/streaming/QueueStream.scala  |  1 -
 .../streaming/RecoverableNetworkWordCount.scala |  1 -
 .../streaming/StatefulNetworkWordCount.scala    |  1 -
 .../examples/streaming/TwitterAlgebirdCMS.scala |  1 -
 .../examples/streaming/TwitterPopularTags.scala |  1 -
 .../examples/streaming/ZeroMQWordCount.scala    |  1 -
 .../streaming/clickstream/PageViewStream.scala  |  1 -
 .../mllib/clustering/StreamingKMeans.scala      |  1 -
 .../regression/StreamingLinearAlgorithm.scala   |  1 -
 .../spark/streaming/StreamingContext.scala      |  7 ++--
 .../streaming/api/java/JavaPairDStream.scala    |  3 +-
 .../spark/streaming/dstream/DStream.scala       | 22 ++++++++----
 .../dstream/PairDStreamFunctions.scala          |  4 +--
 .../dstream/ReducedWindowedDStream.scala        |  2 --
 .../org/apache/spark/streaming/package.scala    |  2 +-
 .../spark/streaming/BasicOperationsSuite.scala  |  1 -
 .../spark/streaming/CheckpointSuite.scala       |  1 -
 .../spark/streaming/MasterFailureTest.scala     |  1 -
 .../spark/streaming/WindowOperationsSuite.scala |  1 -
 .../spark/streamingtest/ImplicitSuite.scala     | 35 ++++++++++++++++++++
 26 files changed, 60 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 1ac5b9e..01450ef 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -75,7 +75,7 @@ main entry point for all streaming functionality. We create a local StreamingCon
 {% highlight scala %}
 import org.apache.spark._
 import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 1.3+
 
 // Create a local StreamingContext with two working thread and batch interval of 1 second.
 // The master requires 2 cores to prevent from a starvation scenario.
@@ -107,7 +107,7 @@ each line will be split into multiple words and the stream of words is
represent
 `words` DStream.  Next, we want to count these words.
 
 {% highlight scala %}
-import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 1.3+
 // Count each word in each batch
 val pairs = words.map(word => (word, 1))
 val wordCounts = pairs.reduceByKey(_ + _)

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
index c9e1511..2adc63f 100644
--- a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
+++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
@@ -22,7 +22,6 @@ import java.util.Properties
 import kafka.producer._
 
 import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.kafka._
 import org.apache.spark.SparkConf
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
index 6bb659f..30269a7 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
@@ -23,7 +23,6 @@ import java.net.Socket
 import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.receiver.Receiver
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
index 6c24bc3..4b4667f 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
@@ -19,7 +19,6 @@ package org.apache.spark.examples.streaming
 
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.StreamingContext._
 
 /**
  * Counts words in new text files created in the given directory

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
index e4283e0..6ff0c47 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
@@ -22,7 +22,6 @@ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
 
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.mqtt._
 import org.apache.spark.SparkConf
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
index ae0a08c..2cd8073 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
@@ -19,7 +19,6 @@ package org.apache.spark.examples.streaming
 
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.storage.StorageLevel
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
index 4caa906..13ba9a4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala
@@ -22,7 +22,6 @@ import scala.collection.mutable.SynchronizedQueue
 import org.apache.spark.SparkConf
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.StreamingContext._
 
 object QueueStream {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index 19427e6..c3a05c8 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -25,7 +25,6 @@ import com.google.common.io.Files
 import org.apache.spark.SparkConf
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
-import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.util.IntParam
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
index ed186ea..345d0bc 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
@@ -20,7 +20,6 @@ package org.apache.spark.examples.streaming
 import org.apache.spark.SparkConf
 import org.apache.spark.HashPartitioner
 import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._
 
 /**
  * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network
every

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
index 683752a..62f4953 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
@@ -23,7 +23,6 @@ import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext._
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.twitter._
 
 // scalastyle:off

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
index f55d23a..f253d75 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.examples.streaming
 
 import org.apache.spark.streaming.{Seconds, StreamingContext}
-import StreamingContext._
 import org.apache.spark.SparkContext._
 import org.apache.spark.streaming.twitter._
 import org.apache.spark.SparkConf

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
index 79905af..6510c70 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
@@ -24,7 +24,6 @@ import akka.zeromq.Subscribe
 import akka.util.ByteString
 
 import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.zeromq._
 
 import scala.language.implicitConversions

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
index 55226c0..fbacaee 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
@@ -19,7 +19,6 @@ package org.apache.spark.examples.streaming.clickstream
 
 import org.apache.spark.SparkContext._
 import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.examples.streaming.StreamingExamples
 // scalastyle:off
 /** Analyses a streaming dataset of web page views. This class demonstrates several types
of

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
index 6189dce..7752c19 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala
@@ -24,7 +24,6 @@ import org.apache.spark.SparkContext._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.util.Utils
 import org.apache.spark.util.random.XORShiftRandom

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
index 8db0442..b549b7c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
@@ -22,7 +22,6 @@ import scala.reflect.ClassTag
 import org.apache.spark.Logging
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.mllib.linalg.Vector
-import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ecab551..8ef0787 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.Map
 import scala.collection.mutable.Queue
-import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
 import akka.actor.{Props, SupervisorStrategy}
@@ -523,9 +522,11 @@ object StreamingContext extends Logging {
 
   private[streaming] val DEFAULT_CLEANER_TTL = 3600
 
-  implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
+  @deprecated("Replaced by implicit functions in the DStream companion object. This is "
+
+    "kept here only for backward compatibility.", "1.3.0")
+  def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
       (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
-    new PairDStreamFunctions[K, V](stream)
+    DStream.toPairDStreamFunctions(stream)(kt, vt, ord)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index bb44b90..de124cf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -36,7 +36,6 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2
=> J
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
 
 /**
@@ -815,6 +814,6 @@ object JavaPairDStream {
 
   def scalaToJavaLong[K: ClassTag](dstream: JavaPairDStream[K, Long])
   : JavaPairDStream[K, JLong] = {
-    StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
+    DStream.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index dbf1ebb..7f8651e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -20,8 +20,8 @@ package org.apache.spark.streaming.dstream
 
 import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
 
-import scala.deprecated
 import scala.collection.mutable.HashMap
+import scala.language.implicitConversions
 import scala.reflect.ClassTag
 import scala.util.matching.Regex
 
@@ -29,7 +29,7 @@ import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.rdd.{BlockRDD, RDD}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.StreamingContext.rddToFileName
 import org.apache.spark.streaming.scheduler.Job
 import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
 
@@ -48,8 +48,7 @@ import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
  * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
  * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow`
and
  * `join`. These operations are automatically available on any DStream of pairs
- * (e.g., DStream[(Int, Int)] through implicit conversions when
- * `org.apache.spark.streaming.StreamingContext._` is imported.
+ * (e.g., DStream[(Int, Int)] through implicit conversions.
  *
  * DStreams internally is characterized by a few basic properties:
  *  - A list of other DStreams that the DStream depends on
@@ -802,10 +801,21 @@ abstract class DStream[T: ClassTag] (
   }
 }
 
-private[streaming] object DStream {
+object DStream {
+
+  // `toPairDStreamFunctions` was in SparkContext before 1.3 and users had to
+  // `import StreamingContext._` to enable it. Now we move it here to make the compiler find
+  // it automatically. However, we still keep the old function in StreamingContext for backward
+  // compatibility and forward to the following function directly.
+
+  implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
+      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):
+    PairDStreamFunctions[K, V] = {
+    new PairDStreamFunctions[K, V](stream)
+  }
 
   /** Get the creation site of a DStream from the stack trace of when the DStream is created.
*/
-  def getCreationSite(): CallSite = {
+  private[streaming] def getCreationSite(): CallSite = {
     val SPARK_CLASS_REGEX = """^org\.apache\.spark""".r
     val SPARK_STREAMING_TESTCLASS_REGEX = """^org\.apache\.spark\.streaming\.test""".r
     val SPARK_EXAMPLES_CLASS_REGEX = """^org\.apache\.spark\.examples""".r

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 98539e0..8a58571 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -27,12 +27,10 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
 import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Duration, Time}
-import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.StreamingContext.rddToFileName
 
 /**
  * Extra functions available on DStream of (key, value) pairs through an implicit conversion.
- * Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use
- * these functions.
  */
 class PairDStreamFunctions[K, V](self: DStream[(K,V)])
     (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index 1a47089..c0a5af0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.StreamingContext._
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.{CoGroupedRDD, MapPartitionsRDD}
 import org.apache.spark.Partitioner

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/streaming/src/main/scala/org/apache/spark/streaming/package.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/package.scala b/streaming/src/main/scala/org/apache/spark/streaming/package.scala
index 4dd985c..2153ae0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/package.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/package.scala
@@ -26,7 +26,7 @@ package org.apache.spark
  * available only on DStreams
  * of key-value pairs, such as `groupByKey` and `reduceByKey`. These operations are automatically
  * available on any DStream of the right type (e.g. DStream[(Int, Int)] through implicit
- * conversions when you `import org.apache.spark.streaming.StreamingContext._`.
+ * conversions.
  *
  * For the Java API of Spark Streaming, take a look at the
  * [[org.apache.spark.streaming.api.java.JavaStreamingContext]] which serves as the entry
point, and

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 86b9678..199f5e7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -28,7 +28,6 @@ import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.{BlockRDD, RDD}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.{DStream, WindowedDStream}
 import org.apache.spark.HashPartitioner
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index c97998a..72d055e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.{IntWritable, Text}
 import org.apache.hadoop.mapred.TextOutputFormat
 import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
 
-import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
 import org.apache.spark.streaming.util.ManualClock
 import org.apache.spark.util.Utils

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 5dbb723..e0f14fd 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -20,7 +20,6 @@ package org.apache.spark.streaming
 import org.apache.spark.Logging
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.util.Utils
-import org.apache.spark.streaming.StreamingContext._
 
 import scala.util.Random
 import scala.collection.mutable.ArrayBuffer

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index 471c99f..a5d2bb2 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.streaming
 
-import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.storage.StorageLevel
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ed2b66/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala b/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala
new file mode 100644
index 0000000..d0bf328
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streamingtest
+
+/**
+ * A test suite to make sure all `implicit` functions work correctly.
+ *
+ * As `implicit` is a compiler feature, we don't need to run this class.
+ * What we need to do is making the compiler happy.
+ */
+class ImplicitSuite {
+
+  // We only want to test if `implict` works well with the compiler, so we don't need a real
DStream.
+  def mockDStream[T]: org.apache.spark.streaming.dstream.DStream[T] = null
+
+  def testToPairDStreamFunctions(): Unit = {
+    val dstream: org.apache.spark.streaming.dstream.DStream[(Int, Int)] = mockDStream
+    dstream.groupByKey()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message