spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [3/3] git commit: [SPARK-1332] Improve Spark Streaming's Network Receiver and InputDStream API [WIP]
Date Tue, 22 Apr 2014 02:05:12 GMT
[SPARK-1332] Improve Spark Streaming's Network Receiver and InputDStream API [WIP]

The current Network Receiver API makes it slightly complicated to right a new receiver as one needs to create an instance of BlockGenerator as shown in SocketReceiver
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L51

Exposing the BlockGenerator interface has made it harder to improve the receiving process. The API of NetworkReceiver (which was not a very stable API anyways) needs to be change if we are to ensure future stability.

Additionally, the functions like streamingContext.socketStream that create input streams, return DStream objects. That makes it hard to expose functionality (say, rate limits) unique to input dstreams. They should return InputDStream or NetworkInputDStream. This is still not yet implemented.

This PR is blocked on the graceful shutdown PR #247

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #300 from tdas/network-receiver-api and squashes the following commits:

ea27b38 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api
3a4777c [Tathagata Das] Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceiver related stuff.
838dd39 [Tathagata Das] Added more events to the StreamingListener to report errors and stopped receivers.
a75c7a6 [Tathagata Das] Address some PR comments and fixed other issues.
91bfa72 [Tathagata Das] Fixed bugs.
8533094 [Tathagata Das] Scala style fixes.
028bde6 [Tathagata Das] Further refactored receiver to allow restarting of a receiver.
43f5290 [Tathagata Das] Made functions that create input streams return InputDStream and NetworkInputDStream, for both Scala and Java.
2c94579 [Tathagata Das] Fixed graceful shutdown by removing interrupts on receiving thread.
9e37a0b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api
3223e95 [Tathagata Das] Refactored the code that runs the NetworkReceiver into further classes and traits to make them more testable.
a36cc48 [Tathagata Das] Refactored the NetworkReceiver API for future stability.
(cherry picked from commit 04c37b6f749dc2418cc28c89964cdc687dfcbd51)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94cbe232
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94cbe232
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94cbe232

Branch: refs/heads/branch-1.0
Commit: 94cbe2329021296b660d88f3e8ef3734374020d2
Parents: a34e6fd
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Mon Apr 21 19:04:49 2014 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Mon Apr 21 19:05:05 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/ui/UIUtils.scala     |   6 +-
 .../streaming/examples/ActorWordCount.scala     |   6 +-
 .../streaming/flume/FlumeInputDStream.scala     |  28 +-
 .../spark/streaming/flume/FlumeUtils.scala      |  10 +-
 .../streaming/flume/JavaFlumeStreamSuite.java   |   6 +-
 .../streaming/flume/FlumeStreamSuite.scala      |   6 +-
 .../streaming/kafka/KafkaInputDStream.scala     |  19 +-
 .../spark/streaming/kafka/KafkaUtils.scala      |  14 +-
 .../streaming/kafka/JavaKafkaStreamSuite.java   |  10 +-
 .../streaming/kafka/KafkaStreamSuite.scala      |  10 +-
 .../spark/streaming/mqtt/MQTTInputDStream.scala |  41 +--
 .../apache/spark/streaming/mqtt/MQTTUtils.scala |  12 +-
 .../streaming/mqtt/JavaMQTTStreamSuite.java     |   6 +-
 .../spark/streaming/mqtt/MQTTStreamSuite.scala  |   6 +-
 .../streaming/twitter/TwitterInputDStream.scala |  23 +-
 .../spark/streaming/twitter/TwitterUtils.scala  |  20 +-
 .../streaming/twitter/TwitterStreamSuite.scala  |  20 +-
 .../spark/streaming/zeromq/ZeroMQReceiver.scala |   7 +-
 .../spark/streaming/zeromq/ZeroMQUtils.scala    |  16 +-
 .../streaming/zeromq/JavaZeroMQStreamSuite.java |   8 +-
 .../streaming/zeromq/ZeroMQStreamSuite.scala    |   9 +-
 project/MimaBuild.scala                         |  29 +-
 .../apache/spark/streaming/DStreamGraph.scala   |   8 +-
 .../spark/streaming/StreamingContext.scala      |  44 ++-
 .../spark/streaming/api/java/JavaDStream.scala  |   4 +
 .../streaming/api/java/JavaInputDStream.scala   |  40 ++
 .../api/java/JavaPairInputDStream.scala         |  41 +++
 .../api/java/JavaPairReceiverInputDStream.scala |  42 +++
 .../api/java/JavaReceiverInputDStream.scala     |  41 +++
 .../api/java/JavaStreamingContext.scala         |  56 ++-
 .../spark/streaming/dstream/InputDStream.scala  |   2 +-
 .../streaming/dstream/NetworkInputDStream.scala | 362 -------------------
 .../dstream/PluggableInputDStream.scala         |   5 +-
 .../streaming/dstream/RawInputDStream.scala     |  16 +-
 .../dstream/ReceiverInputDStream.scala          |  94 +++++
 .../streaming/dstream/SocketInputDStream.scala  |  62 +++-
 .../streaming/receiver/ActorReceiver.scala      | 191 ++++++++++
 .../streaming/receiver/BlockGenerator.scala     | 142 ++++++++
 .../spark/streaming/receiver/Receiver.scala     | 236 ++++++++++++
 .../streaming/receiver/ReceiverMessage.scala    |  23 ++
 .../streaming/receiver/ReceiverSupervisor.scala | 180 +++++++++
 .../receiver/ReceiverSupervisorImpl.scala       | 180 +++++++++
 .../streaming/receivers/ActorReceiver.scala     | 184 ----------
 .../streaming/scheduler/JobGenerator.scala      |  16 +-
 .../streaming/scheduler/JobScheduler.scala      |   8 +-
 .../scheduler/NetworkInputTracker.scala         | 257 -------------
 .../streaming/scheduler/ReceiverTracker.scala   | 278 ++++++++++++++
 .../streaming/scheduler/StreamingListener.scala |  18 +-
 .../scheduler/StreamingListenerBus.scala        |   4 +
 .../ui/StreamingJobProgressListener.scala       |  10 +-
 .../spark/streaming/ui/StreamingPage.scala      |  18 +-
 .../spark/streaming/util/RecurringTimer.scala   |   4 +-
 .../apache/spark/streaming/JavaAPISuite.java    |   9 +-
 .../spark/streaming/InputStreamsSuite.scala     |  19 +-
 .../spark/streaming/NetworkReceiverSuite.scala  | 249 +++++++++++++
 .../spark/streaming/StreamingContextSuite.scala |  42 ++-
 .../streaming/StreamingListenerSuite.scala      |  84 ++++-
 57 files changed, 2193 insertions(+), 1088 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 99770f2..cf987a1 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -121,7 +121,11 @@ private[spark] object UIUtils extends Logging {
         (records, "")
       }
     }
-    "%.1f%s".formatLocal(Locale.US, value, unit)
+    if (unit.isEmpty) {
+      "%d".formatLocal(Locale.US, value)
+    } else {
+      "%.1f%s".formatLocal(Locale.US, value, unit)
+    }
   }
 
   // Yarn has to go through a proxy so the base uri is provided and has to be on all links

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index a22e64c..eb44768 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -26,8 +26,8 @@ import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
 import org.apache.spark.{SparkConf, SecurityManager}
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
-import org.apache.spark.streaming.receivers.Receiver
 import org.apache.spark.util.AkkaUtils
+import org.apache.spark.streaming.receiver.ActorHelper
 
 case class SubscribeReceiver(receiverActor: ActorRef)
 case class UnsubscribeReceiver(receiverActor: ActorRef)
@@ -81,14 +81,14 @@ class FeederActor extends Actor {
  * @see [[org.apache.spark.streaming.examples.FeederActor]]
  */
 class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
-extends Actor with Receiver {
+extends Actor with ActorHelper {
 
   lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
 
   override def preStart = remotePublisher ! SubscribeReceiver(context.self)
 
   def receive = {
-    case msg => pushBlock(msg.asInstanceOf[T])
+    case msg => store(msg.asInstanceOf[T])
   }
 
   override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 34012b8..df7605f 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -34,6 +34,8 @@ import org.apache.spark.util.Utils
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.dstream._
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receiver.Receiver
 
 private[streaming]
 class FlumeInputDStream[T: ClassTag](
@@ -41,9 +43,9 @@ class FlumeInputDStream[T: ClassTag](
   host: String,
   port: Int,
   storageLevel: StorageLevel
-) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
+) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
 
-  override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
+  override def getReceiver(): Receiver[SparkFlumeEvent] = {
     new FlumeReceiver(host, port, storageLevel)
   }
 }
@@ -115,13 +117,13 @@ private[streaming] object SparkFlumeEvent {
 private[streaming]
 class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
   override def append(event : AvroFlumeEvent) : Status = {
-    receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
+    receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
     Status.OK
   }
 
   override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
     events.foreach (event =>
-      receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
+      receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
     Status.OK
   }
 }
@@ -133,23 +135,21 @@ class FlumeReceiver(
     host: String,
     port: Int,
     storageLevel: StorageLevel
-  ) extends NetworkReceiver[SparkFlumeEvent] {
+  ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
 
-  lazy val blockGenerator = new BlockGenerator(storageLevel)
+  lazy val responder = new SpecificResponder(
+    classOf[AvroSourceProtocol], new FlumeEventServer(this))
+  lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))
 
-  protected override def onStart() {
-    val responder = new SpecificResponder(
-      classOf[AvroSourceProtocol], new FlumeEventServer(this))
-    val server = new NettyServer(responder, new InetSocketAddress(host, port))
-    blockGenerator.start()
+  def onStart() {
     server.start()
     logInfo("Flume receiver started")
   }
 
-  protected override def onStop() {
-    blockGenerator.stop()
+  def onStop() {
+    server.close()
     logInfo("Flume receiver stopped")
   }
 
-  override def getLocationPreference = Some(host)
+  override def preferredLocation = Some(host)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 654ba45..499f356 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume
 
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
 
 object FlumeUtils {
   /**
@@ -35,7 +35,7 @@ object FlumeUtils {
       hostname: String,
       port: Int,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): DStream[SparkFlumeEvent] = {
+    ): ReceiverInputDStream[SparkFlumeEvent] = {
     val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
     inputStream
   }
@@ -50,7 +50,7 @@ object FlumeUtils {
       jssc: JavaStreamingContext,
       hostname: String,
       port: Int
-    ): JavaDStream[SparkFlumeEvent] = {
+    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
     createStream(jssc.ssc, hostname, port)
   }
 
@@ -65,7 +65,7 @@ object FlumeUtils {
       hostname: String,
       port: Int,
       storageLevel: StorageLevel
-    ): JavaDStream[SparkFlumeEvent] = {
+    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
     createStream(jssc.ssc, hostname, port, storageLevel)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
index 733389b..e0ad4f1 100644
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -19,16 +19,16 @@ package org.apache.spark.streaming.flume;
 
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
 
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.junit.Test;
 
 public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
   @Test
   public void testFlumeStream() {
     // tests the API, does not actually test data receiving
-    JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
-    JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
+    JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
+    JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
       StorageLevel.MEMORY_AND_DISK_SER_2());
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 8bc4397..7860320 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -31,6 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
 import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
 
 class FlumeStreamSuite extends TestSuiteBase {
 
@@ -39,10 +40,11 @@ class FlumeStreamSuite extends TestSuiteBase {
   test("flume input stream") {
     // Set up the streaming context and input streams
     val ssc = new StreamingContext(conf, batchDuration)
-    val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+    val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
+      FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
     val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
       with SynchronizedBuffer[Seq[SparkFlumeEvent]]
-    val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+    val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
     outputStream.register()
     ssc.start()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index c2d9dcb..21443eb 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -33,6 +33,7 @@ import org.apache.spark.Logging
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
 
 /**
  * Input stream that pulls messages from a Kafka Broker.
@@ -53,11 +54,11 @@ class KafkaInputDStream[
     kafkaParams: Map[String, String],
     topics: Map[String, Int],
     storageLevel: StorageLevel
-  ) extends NetworkInputDStream[(K, V)](ssc_) with Logging {
+  ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
 
-  def getReceiver(): NetworkReceiver[(K, V)] = {
+  def getReceiver(): Receiver[(K, V)] = {
     new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
-        .asInstanceOf[NetworkReceiver[(K, V)]]
+        .asInstanceOf[Receiver[(K, V)]]
   }
 }
 
@@ -70,21 +71,15 @@ class KafkaReceiver[
     kafkaParams: Map[String, String],
     topics: Map[String, Int],
     storageLevel: StorageLevel
-  ) extends NetworkReceiver[Any] {
+  ) extends Receiver[Any](storageLevel) with Logging {
 
-  // Handles pushing data into the BlockManager
-  lazy protected val blockGenerator = new BlockGenerator(storageLevel)
   // Connection to Kafka
   var consumerConnector : ConsumerConnector = null
 
-  def onStop() {
-    blockGenerator.stop()
-  }
+  def onStop() { }
 
   def onStart() {
 
-    blockGenerator.start()
-
     // In case we are using multiple Threads to handle Kafka Messages
     val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
 
@@ -130,7 +125,7 @@ class KafkaReceiver[
     def run() {
       logInfo("Starting MessageHandler.")
       for (msgAndMetadata <- stream) {
-        blockGenerator += (msgAndMetadata.key, msgAndMetadata.message)
+        store((msgAndMetadata.key, msgAndMetadata.message))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 5472d0c..86bb91f 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -27,8 +27,8 @@ import kafka.serializer.{Decoder, StringDecoder}
 
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaPairReceiverInputDStream, JavaStreamingContext, JavaPairDStream}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
 
 
 object KafkaUtils {
@@ -48,7 +48,7 @@ object KafkaUtils {
       groupId: String,
       topics: Map[String, Int],
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): DStream[(String, String)] = {
+    ): ReceiverInputDStream[(String, String)] = {
     val kafkaParams = Map[String, String](
       "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
       "zookeeper.connection.timeout.ms" -> "10000")
@@ -70,7 +70,7 @@ object KafkaUtils {
       kafkaParams: Map[String, String],
       topics: Map[String, Int],
       storageLevel: StorageLevel
-    ): DStream[(K, V)] = {
+    ): ReceiverInputDStream[(K, V)] = {
     new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
   }
 
@@ -88,7 +88,7 @@ object KafkaUtils {
       zkQuorum: String,
       groupId: String,
       topics: JMap[String, JInt]
-    ): JavaPairDStream[String, String] = {
+    ): JavaPairReceiverInputDStream[String, String] = {
     implicit val cmt: ClassTag[String] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
     createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
@@ -110,7 +110,7 @@ object KafkaUtils {
       groupId: String,
       topics: JMap[String, JInt],
       storageLevel: StorageLevel
-    ): JavaPairDStream[String, String] = {
+    ): JavaPairReceiverInputDStream[String, String] = {
     implicit val cmt: ClassTag[String] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
     createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
@@ -139,7 +139,7 @@ object KafkaUtils {
       kafkaParams: JMap[String, String],
       topics: JMap[String, JInt],
       storageLevel: StorageLevel
-    ): JavaPairDStream[K, V] = {
+    ): JavaPairReceiverInputDStream[K, V] = {
     implicit val keyCmt: ClassTag[K] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
     implicit val valueCmt: ClassTag[V] =

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 7b49994..9f8046b 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -18,12 +18,13 @@
 package org.apache.spark.streaming.kafka;
 
 import java.util.HashMap;
+
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
 import org.junit.Test;
 import com.google.common.collect.Maps;
 import kafka.serializer.StringDecoder;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
 
 public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
   @Test
@@ -31,14 +32,15 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
     HashMap<String, Integer> topics = Maps.newHashMap();
 
     // tests the API, does not actually test data receiving
-    JavaPairDStream<String, String> test1 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
-    JavaPairDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
+    JavaPairReceiverInputDStream<String, String> test1 =
+            KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
+    JavaPairReceiverInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
       StorageLevel.MEMORY_AND_DISK_SER_2());
 
     HashMap<String, String> kafkaParams = Maps.newHashMap();
     kafkaParams.put("zookeeper.connect", "localhost:12345");
     kafkaParams.put("group.id","consumer-group");
-    JavaPairDStream<String, String> test3 = KafkaUtils.createStream(ssc,
+      JavaPairReceiverInputDStream<String, String> test3 = KafkaUtils.createStream(ssc,
       String.class, String.class, StringDecoder.class, StringDecoder.class,
       kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index d9809f6..e6f2c4a 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka
 import kafka.serializer.StringDecoder
 import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
 
 class KafkaStreamSuite extends TestSuiteBase {
 
@@ -28,10 +29,13 @@ class KafkaStreamSuite extends TestSuiteBase {
     val topics = Map("my-topic" -> 1)
 
     // tests the API, does not actually test data receiving
-    val test1 = KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
-    val test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
+    val test1: ReceiverInputDStream[(String, String)] =
+      KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
+    val test2: ReceiverInputDStream[(String, String)] =
+      KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
     val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
-    val test3 = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+    val test3: ReceiverInputDStream[(String, String)] =
+      KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
       ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
 
     // TODO: Actually test receiving data

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 1204cfb..0beee8b 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -39,6 +39,7 @@ import org.apache.spark.Logging
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
 
 /**
  * Input stream that subscribe messages from a Mqtt Broker.
@@ -49,38 +50,36 @@ import org.apache.spark.streaming.dstream._
  */
 
 private[streaming]
-class MQTTInputDStream[T: ClassTag](
+class MQTTInputDStream(
     @transient ssc_ : StreamingContext,
     brokerUrl: String,
     topic: String,
     storageLevel: StorageLevel
-  ) extends NetworkInputDStream[T](ssc_) with Logging {
-
-  def getReceiver(): NetworkReceiver[T] = {
-    new MQTTReceiver(brokerUrl, topic, storageLevel).asInstanceOf[NetworkReceiver[T]]
+  ) extends ReceiverInputDStream[String](ssc_) with Logging {
+  
+  def getReceiver(): Receiver[String] = {
+    new MQTTReceiver(brokerUrl, topic, storageLevel)
   }
 }
 
-private[streaming]
-class MQTTReceiver(brokerUrl: String,
-  topic: String,
-  storageLevel: StorageLevel
-  ) extends NetworkReceiver[Any] {
-  lazy protected val blockGenerator = new BlockGenerator(storageLevel)
+private[streaming] 
+class MQTTReceiver(
+    brokerUrl: String,
+    topic: String,
+    storageLevel: StorageLevel
+  ) extends Receiver[String](storageLevel) {
 
   def onStop() {
-    blockGenerator.stop()
-  }
 
+  }
+  
   def onStart() {
 
-    blockGenerator.start()
-
-    // Set up persistence for messages
-    var peristance: MqttClientPersistence = new MemoryPersistence()
+    // Set up persistence for messages 
+    val persistence = new MemoryPersistence()
 
     // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
-    var client: MqttClient = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
+    val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
 
     // Connect to MqttBroker
     client.connect()
@@ -89,18 +88,18 @@ class MQTTReceiver(brokerUrl: String,
     client.subscribe(topic)
 
     // Callback automatically triggers as and when new message arrives on specified topic
-    var callback: MqttCallback = new MqttCallback() {
+    val callback: MqttCallback = new MqttCallback() {
 
       // Handles Mqtt message
       override def messageArrived(arg0: String, arg1: MqttMessage) {
-        blockGenerator += new String(arg1.getPayload())
+        store(new String(arg1.getPayload()))
       }
 
       override def deliveryComplete(arg0: IMqttDeliveryToken) {
       }
 
       override def connectionLost(arg0: Throwable) {
-        logInfo("Connection lost " + arg0)
+        restart("Connection lost ", arg0)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 1b09ee5..c5ffe51 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -19,9 +19,9 @@ package org.apache.spark.streaming.mqtt
 
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
 import scala.reflect.ClassTag
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
 
 object MQTTUtils {
   /**
@@ -36,8 +36,8 @@ object MQTTUtils {
       brokerUrl: String,
       topic: String,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): DStream[String] = {
-    new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
+    ): ReceiverInputDStream[String] = {
+    new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
   }
 
   /**
@@ -51,7 +51,7 @@ object MQTTUtils {
       jssc: JavaStreamingContext,
       brokerUrl: String,
       topic: String
-    ): JavaDStream[String] = {
+    ): JavaReceiverInputDStream[String] = {
     implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
     createStream(jssc.ssc, brokerUrl, topic)
   }
@@ -68,7 +68,7 @@ object MQTTUtils {
       brokerUrl: String,
       topic: String,
       storageLevel: StorageLevel
-    ): JavaDStream[String] = {
+    ): JavaReceiverInputDStream[String] = {
     implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
     createStream(jssc.ssc, brokerUrl, topic, storageLevel)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index 44743aa..ce5aa1e 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.mqtt;
 
 import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.junit.Test;
 
 import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -30,8 +30,8 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
     String topic = "def";
 
     // tests the API, does not actually test data receiving
-    JavaDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
-    JavaDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
+    JavaReceiverInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
+    JavaReceiverInputDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
       StorageLevel.MEMORY_AND_DISK_SER_2());
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 89c40ad..467fd26 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt
 
 import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
 
 class MQTTStreamSuite extends TestSuiteBase {
 
@@ -28,8 +29,9 @@ class MQTTStreamSuite extends TestSuiteBase {
     val topic = "def"
 
     // tests the API, does not actually test data receiving
-    val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic)
-    val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
+    val test1: ReceiverInputDStream[String] = MQTTUtils.createStream(ssc, brokerUrl, topic)
+    val test2: ReceiverInputDStream[String] =
+      MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
 
     // TODO: Actually test receiving data
     ssc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 843a4a7..7bca140 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -25,6 +25,8 @@ import twitter4j.auth.OAuthAuthorization
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receiver.Receiver
 
 /* A stream of Twitter statuses, potentially filtered by one or more keywords.
 *
@@ -41,7 +43,7 @@ class TwitterInputDStream(
     twitterAuth: Option[Authorization],
     filters: Seq[String],
     storageLevel: StorageLevel
-  ) extends NetworkInputDStream[Status](ssc_)  {
+  ) extends ReceiverInputDStream[Status](ssc_)  {
 
   private def createOAuthAuthorization(): Authorization = {
     new OAuthAuthorization(new ConfigurationBuilder().build())
@@ -49,7 +51,7 @@ class TwitterInputDStream(
 
   private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
 
-  override def getReceiver(): NetworkReceiver[Status] = {
+  override def getReceiver(): Receiver[Status] = {
     new TwitterReceiver(authorization, filters, storageLevel)
   }
 }
@@ -59,27 +61,27 @@ class TwitterReceiver(
     twitterAuth: Authorization,
     filters: Seq[String],
     storageLevel: StorageLevel
-  ) extends NetworkReceiver[Status] {
+  ) extends Receiver[Status](storageLevel) with Logging {
 
   var twitterStream: TwitterStream = _
-  lazy val blockGenerator = new BlockGenerator(storageLevel)
 
-  protected override def onStart() {
-    blockGenerator.start()
+  def onStart() {
     twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
     twitterStream.addListener(new StatusListener {
       def onStatus(status: Status) = {
-        blockGenerator += status
+        store(status)
       }
       // Unimplemented
       def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
       def onTrackLimitationNotice(i: Int) {}
       def onScrubGeo(l: Long, l1: Long) {}
       def onStallWarning(stallWarning: StallWarning) {}
-      def onException(e: Exception) { stopOnError(e) }
+      def onException(e: Exception) {
+        restart("Error receiving tweets", e)
+      }
     })
 
-    val query: FilterQuery = new FilterQuery
+    val query = new FilterQuery
     if (filters.size > 0) {
       query.track(filters.toArray)
       twitterStream.filter(query)
@@ -89,8 +91,7 @@ class TwitterReceiver(
     logInfo("Twitter receiver started")
   }
 
-  protected override def onStop() {
-    blockGenerator.stop()
+  def onStop() {
     twitterStream.shutdown()
     logInfo("Twitter receiver stopped")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index e8433b7..c6a9a2b 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -21,8 +21,8 @@ import twitter4j.Status
 import twitter4j.auth.Authorization
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
 
 object TwitterUtils {
   /**
@@ -40,7 +40,7 @@ object TwitterUtils {
       twitterAuth: Option[Authorization],
       filters: Seq[String] = Nil,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): DStream[Status] = {
+    ): ReceiverInputDStream[Status] = {
     new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
   }
 
@@ -52,7 +52,7 @@ object TwitterUtils {
    * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
    * @param jssc   JavaStreamingContext object
    */
-  def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
+  def createStream(jssc: JavaStreamingContext): JavaReceiverInputDStream[Status] = {
     createStream(jssc.ssc, None)
   }
 
@@ -65,7 +65,8 @@ object TwitterUtils {
    * @param jssc    JavaStreamingContext object
    * @param filters Set of filter strings to get only those tweets that match them
    */
-  def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = {
+  def createStream(jssc: JavaStreamingContext, filters: Array[String]
+      ): JavaReceiverInputDStream[Status] = {
     createStream(jssc.ssc, None, filters)
   }
 
@@ -82,7 +83,7 @@ object TwitterUtils {
       jssc: JavaStreamingContext,
       filters: Array[String],
       storageLevel: StorageLevel
-    ): JavaDStream[Status] = {
+    ): JavaReceiverInputDStream[Status] = {
     createStream(jssc.ssc, None, filters, storageLevel)
   }
 
@@ -92,7 +93,8 @@ object TwitterUtils {
    * @param jssc        JavaStreamingContext object
    * @param twitterAuth Twitter4J Authorization
    */
-  def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = {
+  def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization
+    ): JavaReceiverInputDStream[Status] = {
     createStream(jssc.ssc, Some(twitterAuth))
   }
 
@@ -107,7 +109,7 @@ object TwitterUtils {
       jssc: JavaStreamingContext,
       twitterAuth: Authorization,
       filters: Array[String]
-    ): JavaDStream[Status] = {
+    ): JavaReceiverInputDStream[Status] = {
     createStream(jssc.ssc, Some(twitterAuth), filters)
   }
 
@@ -123,7 +125,7 @@ object TwitterUtils {
       twitterAuth: Authorization,
       filters: Array[String],
       storageLevel: StorageLevel
-    ): JavaDStream[Status] = {
+    ): JavaReceiverInputDStream[Status] = {
     createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
index 06ab0cd..93741e0 100644
--- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.streaming.twitter
 import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
 import org.apache.spark.storage.StorageLevel
 import twitter4j.auth.{NullAuthorization, Authorization}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import twitter4j.Status
 
 class TwitterStreamSuite extends TestSuiteBase {
 
@@ -29,13 +31,17 @@ class TwitterStreamSuite extends TestSuiteBase {
     val authorization: Authorization = NullAuthorization.getInstance()
 
     // tests the API, does not actually test data receiving
-    val test1 = TwitterUtils.createStream(ssc, None)
-    val test2 = TwitterUtils.createStream(ssc, None, filters)
-    val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
-    val test4 = TwitterUtils.createStream(ssc, Some(authorization))
-    val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters)
-    val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters,
-      StorageLevel.MEMORY_AND_DISK_SER_2)
+    val test1: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, None)
+    val test2: ReceiverInputDStream[Status] =
+      TwitterUtils.createStream(ssc, None, filters)
+    val test3: ReceiverInputDStream[Status] =
+      TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+    val test4: ReceiverInputDStream[Status] =
+      TwitterUtils.createStream(ssc, Some(authorization))
+    val test5: ReceiverInputDStream[Status] =
+      TwitterUtils.createStream(ssc, Some(authorization), filters)
+    val test6: ReceiverInputDStream[Status] = TwitterUtils.createStream(
+      ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
 
     // Note that actually testing the data receiving is hard as authentication keys are
     // necessary for accessing Twitter live stream

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index a538c38..5547058 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -24,7 +24,7 @@ import akka.util.ByteString
 import akka.zeromq._
 
 import org.apache.spark.Logging
-import org.apache.spark.streaming.receivers._
+import org.apache.spark.streaming.receiver.ActorHelper
 
 /**
  * A receiver to subscribe to ZeroMQ stream.
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.receivers._
 private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
   subscribe: Subscribe,
   bytesToObjects: Seq[ByteString] => Iterator[T])
-  extends Actor with Receiver with Logging {
+  extends Actor with ActorHelper with Logging {
 
   override def preStart() = ZeroMQExtension(context.system)
     .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
@@ -46,9 +46,8 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
 
       // We ignore first frame for processing as it is the topic
       val bytes = m.frames.tail
-      pushBlock(bytesToObjects(bytes))
+      store(bytesToObjects(bytes))
 
     case Closed => logInfo("received closed ")
-
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index b254e00..0469d0a 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -24,10 +24,10 @@ import akka.util.ByteString
 import akka.zeromq.Subscribe
 import org.apache.spark.api.java.function.{Function => JFunction}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
 import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream}
+import org.apache.spark.streaming.receiver.ActorSupervisorStrategy
 
 object ZeroMQUtils {
   /**
@@ -48,8 +48,8 @@ object ZeroMQUtils {
       subscribe: Subscribe,
       bytesToObjects: Seq[ByteString] => Iterator[T],
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
-      supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
-    ): DStream[T] = {
+      supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
+    ): ReceiverInputDStream[T] = {
     ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
       "ZeroMQReceiver", storageLevel, supervisorStrategy)
   }
@@ -72,7 +72,7 @@ object ZeroMQUtils {
       bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
       storageLevel: StorageLevel,
       supervisorStrategy: SupervisorStrategy
-    ): JavaDStream[T] = {
+    ): JavaReceiverInputDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
@@ -96,7 +96,7 @@ object ZeroMQUtils {
       subscribe: Subscribe,
       bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
       storageLevel: StorageLevel
-    ): JavaDStream[T] = {
+    ): JavaReceiverInputDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator
@@ -119,7 +119,7 @@ object ZeroMQUtils {
       publisherUrl: String,
       subscribe: Subscribe,
       bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
-    ): JavaDStream[T] = {
+    ): JavaReceiverInputDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index d2361e1..417b91e 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.streaming.zeromq;
 
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.junit.Test;
 import akka.actor.SupervisorStrategy;
 import akka.util.ByteString;
@@ -24,7 +25,6 @@ import akka.zeromq.Subscribe;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
 
 public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
 
@@ -39,11 +39,11 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
       }
     };
 
-    JavaDStream<String> test1 = ZeroMQUtils.<String>createStream(
+    JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream(
       ssc, publishUrl, subscribe, bytesToObjects);
-    JavaDStream<String> test2 = ZeroMQUtils.<String>createStream(
+    JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream(
       ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
-    JavaDStream<String> test3 = ZeroMQUtils.<String>createStream(
+    JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream(
       ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(),
       SupervisorStrategy.defaultStrategy());
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index 92d55a7..cc10ff6 100644
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -23,6 +23,7 @@ import akka.zeromq.Subscribe
 
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
 
 class ZeroMQStreamSuite extends TestSuiteBase {
 
@@ -33,10 +34,12 @@ class ZeroMQStreamSuite extends TestSuiteBase {
     val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
 
     // tests the API, does not actually test data receiving
-    val test1 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
-    val test2 = ZeroMQUtils.createStream(
+    val test1: ReceiverInputDStream[String] =
+      ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+    val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
       ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
-    val test3 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects,
+    val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+      ssc, publishUrl, subscribe, bytesToObjects,
       StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
 
     // TODO: Actually test data receiving

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/project/MimaBuild.scala
----------------------------------------------------------------------
diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala
index 9cb31d7..d540dc0 100644
--- a/project/MimaBuild.scala
+++ b/project/MimaBuild.scala
@@ -38,6 +38,7 @@ object MimaBuild {
         IO.read(excludeFile).split("\n")
       }
 
+    // Exclude a single class and its corresponding object
     def excludeClass(className: String) = {
       Seq(
         excludePackage(className), 
@@ -48,7 +49,16 @@ object MimaBuild {
         ProblemFilters.exclude[MissingTypesProblem](className + "$")
       )
     }
-    def excludeSparkClass(className: String) = excludeClass("org.apache.spark." + className)
+
+    // Exclude a Spark class, that is in the package org.apache.spark
+    def excludeSparkClass(className: String) = {
+      excludeClass("org.apache.spark." + className)
+    }
+
+    // Exclude a Spark package, that is in the package org.apache.spark
+    def excludeSparkPackage(packageName: String) = {
+      excludePackage("org.apache.spark." + packageName)
+    }
 
     val packagePrivateExcludes = packagePrivateList.flatMap(excludeClass)
 
@@ -58,10 +68,9 @@ object MimaBuild {
       SparkBuild.SPARK_VERSION match {
         case v if v.startsWith("1.0") =>
           Seq(
-            excludePackage("org.apache.spark.api.java"),
-            excludePackage("org.apache.spark.streaming.api.java"),
-            excludePackage("org.apache.spark.streaming.scheduler"),
-            excludePackage("org.apache.spark.mllib")
+            excludeSparkPackage("api.java"),
+            excludeSparkPackage("mllib"),
+            excludeSparkPackage("streaming")
           ) ++
           excludeSparkClass("rdd.ClassTags") ++
           excludeSparkClass("util.XORShiftRandom") ++
@@ -69,14 +78,7 @@ object MimaBuild {
           excludeSparkClass("mllib.optimization.SquaredGradient") ++
           excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
           excludeSparkClass("mllib.regression.LassoWithSGD") ++
-          excludeSparkClass("mllib.regression.LinearRegressionWithSGD") ++
-          excludeSparkClass("streaming.dstream.NetworkReceiver") ++
-          excludeSparkClass("streaming.dstream.NetworkReceiver#NetworkReceiverActor") ++
-          excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator") ++
-          excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator#Block") ++
-          excludeSparkClass("streaming.dstream.ReportError") ++
-          excludeSparkClass("streaming.dstream.ReportBlock") ++
-          excludeSparkClass("streaming.dstream.DStream")
+          excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
         case _ => Seq()
       }
 
@@ -87,5 +89,4 @@ object MimaBuild {
     previousArtifact := None,
     binaryIssueFilters ++= ignoredABIProblems(sparkHome)
   )
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index d333906..b4adf0e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
 import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
 import org.apache.spark.Logging
 import org.apache.spark.streaming.scheduler.Job
-import org.apache.spark.streaming.dstream.{DStream, NetworkInputDStream, InputDStream}
+import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream, InputDStream}
 
 final private[streaming] class DStreamGraph extends Serializable with Logging {
 
@@ -103,9 +103,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
 
   def getOutputStreams() = this.synchronized { outputStreams.toArray }
 
-  def getNetworkInputStreams() = this.synchronized {
-    inputStreams.filter(_.isInstanceOf[NetworkInputDStream[_]])
-      .map(_.asInstanceOf[NetworkInputDStream[_]])
+  def getReceiverInputStreams() = this.synchronized {
+    inputStreams.filter(_.isInstanceOf[ReceiverInputDStream[_]])
+      .map(_.asInstanceOf[ReceiverInputDStream[_]])
       .toArray
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index e9a4f7b..daa5c69 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -31,12 +31,11 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{LongWritable, Text}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receivers._
+import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver}
 import org.apache.spark.streaming.scheduler._
 import org.apache.spark.streaming.ui.StreamingTab
 import org.apache.spark.util.MetadataCleaner
@@ -139,7 +138,7 @@ class StreamingContext private[streaming] (
     }
   }
 
-  private val nextNetworkInputStreamId = new AtomicInteger(0)
+  private val nextReceiverInputStreamId = new AtomicInteger(0)
 
   private[streaming] var checkpointDir: String = {
     if (isCheckpointPresent) {
@@ -208,15 +207,26 @@ class StreamingContext private[streaming] (
     if (isCheckpointPresent) cp_ else null
   }
 
-  private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
+  private[streaming] def getNewReceiverStreamId() = nextReceiverInputStreamId.getAndIncrement()
 
   /**
-   * Create an input stream with any arbitrary user implemented network receiver.
+   * Create an input stream with any arbitrary user implemented receiver.
    * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
-   * @param receiver Custom implementation of NetworkReceiver
+   * @param receiver Custom implementation of Receiver
    */
+  @deprecated("Use receiverStream", "1.0.0")
   def networkStream[T: ClassTag](
-    receiver: NetworkReceiver[T]): DStream[T] = {
+    receiver: Receiver[T]): ReceiverInputDStream[T] = {
+    receiverStream(receiver)
+  }
+
+  /**
+   * Create an input stream with any arbitrary user implemented receiver.
+   * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+   * @param receiver Custom implementation of Receiver
+   */
+  def receiverStream[T: ClassTag](
+    receiver: Receiver[T]): ReceiverInputDStream[T] = {
     new PluggableInputDStream[T](this, receiver)
   }
 
@@ -236,9 +246,9 @@ class StreamingContext private[streaming] (
       props: Props,
       name: String,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
-      supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
-    ): DStream[T] = {
-    networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
+      supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
+    ): ReceiverInputDStream[T] = {
+    receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
   }
 
   /**
@@ -254,7 +264,7 @@ class StreamingContext private[streaming] (
       hostname: String,
       port: Int,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): DStream[String] = {
+    ): ReceiverInputDStream[String] = {
     socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
   }
 
@@ -273,7 +283,7 @@ class StreamingContext private[streaming] (
       port: Int,
       converter: (InputStream) => Iterator[T],
       storageLevel: StorageLevel
-    ): DStream[T] = {
+    ): ReceiverInputDStream[T] = {
     new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
   }
 
@@ -292,7 +302,7 @@ class StreamingContext private[streaming] (
       hostname: String,
       port: Int,
       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): DStream[T] = {
+    ): ReceiverInputDStream[T] = {
     new RawInputDStream[T](this, hostname, port, storageLevel)
   }
 
@@ -310,7 +320,7 @@ class StreamingContext private[streaming] (
     K: ClassTag,
     V: ClassTag,
     F <: NewInputFormat[K, V]: ClassTag
-  ] (directory: String): DStream[(K, V)] = {
+  ] (directory: String): InputDStream[(K, V)] = {
     new FileInputDStream[K, V, F](this, directory)
   }
 
@@ -330,7 +340,7 @@ class StreamingContext private[streaming] (
     K: ClassTag,
     V: ClassTag,
     F <: NewInputFormat[K, V]: ClassTag
-  ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = {
+  ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = {
     new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
   }
 
@@ -356,7 +366,7 @@ class StreamingContext private[streaming] (
   def queueStream[T: ClassTag](
       queue: Queue[RDD[T]],
       oneAtATime: Boolean = true
-    ): DStream[T] = {
+    ): InputDStream[T] = {
     queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1))
   }
 
@@ -373,7 +383,7 @@ class StreamingContext private[streaming] (
       queue: Queue[RDD[T]],
       oneAtATime: Boolean,
       defaultRDD: RDD[T]
-    ): DStream[T] = {
+    ): InputDStream[T] = {
     new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index 13e2bac..505e443 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -97,6 +97,10 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T]
 }
 
 object JavaDStream {
+  /**
+   * Convert a scala [[org.apache.spark.streaming.dstream.DStream]] to a Java-friendly
+   * [[org.apache.spark.streaming.api.java.JavaDStream]].
+   */
   implicit def fromDStream[T: ClassTag](dstream: DStream[T]): JavaDStream[T] =
     new JavaDStream[T](dstream)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala
new file mode 100644
index 0000000..91f8d34
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaInputDStream.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.streaming.dstream.InputDStream
+
+/**
+ * A Java-friendly interface to [[org.apache.spark.streaming.dstream.InputDStream]].
+ */
+class JavaInputDStream[T](val inputDStream: InputDStream[T])
+  (implicit override val classTag: ClassTag[T]) extends JavaDStream[T](inputDStream) {
+}
+
+object JavaInputDStream {
+  /**
+   * Convert a scala [[org.apache.spark.streaming.dstream.InputDStream]] to a Java-friendly
+   * [[org.apache.spark.streaming.api.java.JavaInputDStream]].
+   */
+  implicit def fromInputDStream[T: ClassTag](
+      inputDStream: InputDStream[T]): JavaInputDStream[T] = {
+    new JavaInputDStream[T](inputDStream)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala
new file mode 100644
index 0000000..add8585
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairInputDStream.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import org.apache.spark.streaming.dstream.InputDStream
+import scala.reflect.ClassTag
+
+/**
+ * A Java-friendly interface to [[org.apache.spark.streaming.dstream.InputDStream]] of
+ * key-value pairs.
+ */
+class JavaPairInputDStream[K, V](val inputDStream: InputDStream[(K, V)])(
+    implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V]
+  ) extends JavaPairDStream[K, V](inputDStream) {
+}
+
+object JavaPairInputDStream {
+  /**
+   * Convert a scala [[org.apache.spark.streaming.dstream.InputDStream]] of pairs to a
+   * Java-friendly [[org.apache.spark.streaming.api.java.JavaPairInputDStream]].
+   */
+  implicit def fromInputDStream[K: ClassTag, V: ClassTag](
+       inputDStream: InputDStream[(K, V)]): JavaPairInputDStream[K, V] = {
+    new JavaPairInputDStream[K, V](inputDStream)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala
new file mode 100644
index 0000000..974b3e4
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+/**
+ * A Java-friendly interface to [[org.apache.spark.streaming.dstream.ReceiverInputDStream]], the
+ * abstract class for defining any input stream that receives data over the network.
+ */
+class JavaPairReceiverInputDStream[K, V](val receiverInputDStream: ReceiverInputDStream[(K, V)])
+    (implicit override val kClassTag: ClassTag[K], override implicit val vClassTag: ClassTag[V])
+  extends JavaPairInputDStream[K, V](receiverInputDStream) {
+}
+
+object JavaPairReceiverInputDStream {
+  /**
+   * Convert a scala [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] to a Java-friendly
+   * [[org.apache.spark.streaming.api.java.JavaReceiverInputDStream]].
+   */
+  implicit def fromReceiverInputDStream[K: ClassTag, V: ClassTag](
+      receiverInputDStream: ReceiverInputDStream[(K, V)]): JavaPairReceiverInputDStream[K, V] = {
+    new JavaPairReceiverInputDStream[K, V](receiverInputDStream)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala
new file mode 100644
index 0000000..340ef97
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaReceiverInputDStream.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.api.java
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+/**
+ * A Java-friendly interface to [[org.apache.spark.streaming.dstream.ReceiverInputDStream]], the
+ * abstract class for defining any input stream that receives data over the network.
+ */
+class JavaReceiverInputDStream[T](val receiverInputDStream: ReceiverInputDStream[T])
+  (implicit override val classTag: ClassTag[T]) extends JavaInputDStream[T](receiverInputDStream) {
+}
+
+object JavaReceiverInputDStream {
+  /**
+   * Convert a scala [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] to a Java-friendly
+   * [[org.apache.spark.streaming.api.java.JavaReceiverInputDStream]].
+   */
+  implicit def fromReceiverInputDStream[T: ClassTag](
+      receiverInputDStream: ReceiverInputDStream[T]): JavaReceiverInputDStream[T] = {
+    new JavaReceiverInputDStream[T](receiverInputDStream)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index c800602..fbb2e9f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -35,7 +35,8 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.scheduler.StreamingListener
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.dstream.{PluggableInputDStream, ReceiverInputDStream, DStream}
+import org.apache.spark.streaming.receiver.Receiver
 
 /**
  * A Java-friendly version of [[org.apache.spark.streaming.StreamingContext]] which is the main
@@ -155,8 +156,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * @param port          Port to connect to for receiving data
    * @param storageLevel  Storage level to use for storing the received objects
    */
-  def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
-  : JavaDStream[String] = {
+  def socketTextStream(
+      hostname: String, port: Int,
+      storageLevel: StorageLevel
+    ): JavaReceiverInputDStream[String] = {
     ssc.socketTextStream(hostname, port, storageLevel)
   }
 
@@ -167,7 +170,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * @param hostname      Hostname to connect to for receiving data
    * @param port          Port to connect to for receiving data
    */
-  def socketTextStream(hostname: String, port: Int): JavaDStream[String] = {
+  def socketTextStream(hostname: String, port: Int): JavaReceiverInputDStream[String] = {
     ssc.socketTextStream(hostname, port)
   }
 
@@ -186,7 +189,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
       port: Int,
       converter: JFunction[InputStream, java.lang.Iterable[T]],
       storageLevel: StorageLevel)
-  : JavaDStream[T] = {
+  : JavaReceiverInputDStream[T] = {
     def fn = (x: InputStream) => converter.call(x).toIterator
     implicit val cmt: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
@@ -218,10 +221,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   def rawSocketStream[T](
       hostname: String,
       port: Int,
-      storageLevel: StorageLevel): JavaDStream[T] = {
+      storageLevel: StorageLevel): JavaReceiverInputDStream[T] = {
     implicit val cmt: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel))
+    JavaReceiverInputDStream.fromReceiverInputDStream(
+      ssc.rawSocketStream(hostname, port, storageLevel))
   }
 
   /**
@@ -233,10 +237,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * @param port          Port to connect to for receiving data
    * @tparam T            Type of the objects in the received blocks
    */
-  def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = {
+  def rawSocketStream[T](hostname: String, port: Int): JavaReceiverInputDStream[T] = {
     implicit val cmt: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
-    JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port))
+    JavaReceiverInputDStream.fromReceiverInputDStream(
+      ssc.rawSocketStream(hostname, port))
   }
 
   /**
@@ -249,7 +254,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * @tparam V Value type for reading HDFS file
    * @tparam F Input format for reading HDFS file
    */
-  def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = {
+  def fileStream[K, V, F <: NewInputFormat[K, V]](
+      directory: String): JavaPairInputDStream[K, V] = {
     implicit val cmk: ClassTag[K] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
     implicit val cmv: ClassTag[V] =
@@ -275,7 +281,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
       name: String,
       storageLevel: StorageLevel,
       supervisorStrategy: SupervisorStrategy
-    ): JavaDStream[T] = {
+    ): JavaReceiverInputDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     ssc.actorStream[T](props, name, storageLevel, supervisorStrategy)
@@ -296,7 +302,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
       props: Props,
       name: String,
       storageLevel: StorageLevel
-  ): JavaDStream[T] = {
+    ): JavaReceiverInputDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     ssc.actorStream[T](props, name, storageLevel)
@@ -316,14 +322,14 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   def actorStream[T](
       props: Props,
       name: String
-    ): JavaDStream[T] = {
+    ): JavaReceiverInputDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     ssc.actorStream[T](props, name)
   }
 
   /**
-   * Creates an input stream from an queue of RDDs. In each batch,
+   * Create an input stream from an queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
    *
    * NOTE: changes to the queue after the stream is created will not be recognized.
@@ -339,7 +345,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Creates an input stream from an queue of RDDs. In each batch,
+   * Create an input stream from an queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
    *
    * NOTE: changes to the queue after the stream is created will not be recognized.
@@ -347,7 +353,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
    * @tparam T         Type of objects in the RDD
    */
-  def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = {
+  def queueStream[T](
+      queue: java.util.Queue[JavaRDD[T]],
+      oneAtATime: Boolean
+    ): JavaInputDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     val sQueue = new scala.collection.mutable.Queue[RDD[T]]
@@ -356,7 +365,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
-   * Creates an input stream from an queue of RDDs. In each batch,
+   * Create an input stream from an queue of RDDs. In each batch,
    * it will process either one or all of the RDDs returned by the queue.
    *
    * NOTE: changes to the queue after the stream is created will not be recognized.
@@ -368,7 +377,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   def queueStream[T](
       queue: java.util.Queue[JavaRDD[T]],
       oneAtATime: Boolean,
-      defaultRDD: JavaRDD[T]): JavaDStream[T] = {
+      defaultRDD: JavaRDD[T]): JavaInputDStream[T] = {
     implicit val cm: ClassTag[T] =
       implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
     val sQueue = new scala.collection.mutable.Queue[RDD[T]]
@@ -377,6 +386,17 @@ class JavaStreamingContext(val ssc: StreamingContext) {
   }
 
   /**
+     * Create an input stream with any arbitrary user implemented receiver.
+     * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+     * @param receiver Custom implementation of Receiver
+     */
+  def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T] = {
+    implicit val cm: ClassTag[T] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+    ssc.receiverStream(receiver)
+  }
+
+  /**
    * Create a unified DStream from multiple DStreams of the same type and same slide duration.
    */
   def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/94cbe232/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 226844c..aa1993f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -30,7 +30,7 @@ import scala.reflect.ClassTag
  * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for
  * new files and generates RDDs with the new files. For implementing input streams
  * that requires running a receiver on the worker nodes, use
- * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] as the parent class.
+ * [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.
  *
  * @param ssc_ Streaming context that will execute this input stream
  */


Mime
View raw message