spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [3/5] spark git commit: [SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact
Date Wed, 11 May 2016 19:15:47 GMT
http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
new file mode 100644
index 0000000..cb782d2
--- /dev/null
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.io.File
+import java.util.Arrays
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.StringDecoder
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+import org.apache.spark.util.Utils
+
+class DirectKafkaStreamSuite
+  extends SparkFunSuite
+  with BeforeAndAfter
+  with BeforeAndAfterAll
+  with Eventually
+  with Logging {
+  val sparkConf = new SparkConf()
+    .setMaster("local[4]")
+    .setAppName(this.getClass.getSimpleName)
+
+  private var sc: SparkContext = _
+  private var ssc: StreamingContext = _
+  private var testDir: File = _
+
+  private var kafkaTestUtils: KafkaTestUtils = _
+
+  override def beforeAll {
+    kafkaTestUtils = new KafkaTestUtils
+    kafkaTestUtils.setup()
+  }
+
+  override def afterAll {
+    if (kafkaTestUtils != null) {
+      kafkaTestUtils.teardown()
+      kafkaTestUtils = null
+    }
+  }
+
+  after {
+    if (ssc != null) {
+      ssc.stop()
+      sc = null
+    }
+    if (sc != null) {
+      sc.stop()
+    }
+    if (testDir != null) {
+      Utils.deleteRecursively(testDir)
+    }
+  }
+
+
+  test("basic stream receiving with multiple topics and smallest starting offset") {
+    val topics = Set("basic1", "basic2", "basic3")
+    val data = Map("a" -> 7, "b" -> 9)
+    topics.foreach { t =>
+      kafkaTestUtils.createTopic(t)
+      kafkaTestUtils.sendMessages(t, data)
+    }
+    val totalSent = data.values.sum * topics.size
+    val kafkaParams = Map(
+      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+      "auto.offset.reset" -> "smallest"
+    )
+
+    ssc = new StreamingContext(sparkConf, Milliseconds(200))
+    val stream = withClue("Error creating direct stream") {
+      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+        ssc, kafkaParams, topics)
+    }
+
+    val allReceived = new ConcurrentLinkedQueue[(String, String)]()
+
+    // hold a reference to the current offset ranges, so it can be used downstream
+    var offsetRanges = Array[OffsetRange]()
+
+    stream.transform { rdd =>
+      // Get the offset ranges in the RDD
+      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+      rdd
+    }.foreachRDD { rdd =>
+      for (o <- offsetRanges) {
+        logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
+      }
+      val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
+      // For each partition, get size of the range in the partition,
+      // and the number of items in the partition
+        val off = offsetRanges(i)
+        val all = iter.toSeq
+        val partSize = all.size
+        val rangeSize = off.untilOffset - off.fromOffset
+        Iterator((partSize, rangeSize))
+      }.collect
+
+      // Verify whether number of elements in each partition
+      // matches with the corresponding offset range
+      collected.foreach { case (partSize, rangeSize) =>
+        assert(partSize === rangeSize, "offset ranges are wrong")
+      }
+    }
+    stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
+    ssc.start()
+    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+      assert(allReceived.size === totalSent,
+        "didn't get expected number of messages, messages:\n" +
+          allReceived.asScala.mkString("\n"))
+    }
+    ssc.stop()
+  }
+
+  test("receiving from largest starting offset") {
+    val topic = "largest"
+    val topicPartition = TopicAndPartition(topic, 0)
+    val data = Map("a" -> 10)
+    kafkaTestUtils.createTopic(topic)
+    val kafkaParams = Map(
+      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+      "auto.offset.reset" -> "largest"
+    )
+    val kc = new KafkaCluster(kafkaParams)
+    def getLatestOffset(): Long = {
+      kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
+    }
+
+    // Send some initial messages before starting context
+    kafkaTestUtils.sendMessages(topic, data)
+    eventually(timeout(10 seconds), interval(20 milliseconds)) {
+      assert(getLatestOffset() > 3)
+    }
+    val offsetBeforeStart = getLatestOffset()
+
+    // Setup context and kafka stream with largest offset
+    ssc = new StreamingContext(sparkConf, Milliseconds(200))
+    val stream = withClue("Error creating direct stream") {
+      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+        ssc, kafkaParams, Set(topic))
+    }
+    assert(
+      stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
+        .fromOffsets(topicPartition) >= offsetBeforeStart,
+      "Start offset not from latest"
+    )
+
+    val collectedData = new ConcurrentLinkedQueue[String]()
+    stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
+    ssc.start()
+    val newData = Map("b" -> 10)
+    kafkaTestUtils.sendMessages(topic, newData)
+    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      collectedData.contains("b")
+    }
+    assert(!collectedData.contains("a"))
+  }
+
+
+  test("creating stream by offset") {
+    val topic = "offset"
+    val topicPartition = TopicAndPartition(topic, 0)
+    val data = Map("a" -> 10)
+    kafkaTestUtils.createTopic(topic)
+    val kafkaParams = Map(
+      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+      "auto.offset.reset" -> "largest"
+    )
+    val kc = new KafkaCluster(kafkaParams)
+    def getLatestOffset(): Long = {
+      kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
+    }
+
+    // Send some initial messages before starting context
+    kafkaTestUtils.sendMessages(topic, data)
+    eventually(timeout(10 seconds), interval(20 milliseconds)) {
+      assert(getLatestOffset() >= 10)
+    }
+    val offsetBeforeStart = getLatestOffset()
+
+    // Setup context and kafka stream with largest offset
+    ssc = new StreamingContext(sparkConf, Milliseconds(200))
+    val stream = withClue("Error creating direct stream") {
+      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
+        ssc, kafkaParams, Map(topicPartition -> 11L),
+        (m: MessageAndMetadata[String, String]) => m.message())
+    }
+    assert(
+      stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
+        .fromOffsets(topicPartition) >= offsetBeforeStart,
+      "Start offset not from latest"
+    )
+
+    val collectedData = new ConcurrentLinkedQueue[String]()
+    stream.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
+    ssc.start()
+    val newData = Map("b" -> 10)
+    kafkaTestUtils.sendMessages(topic, newData)
+    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      collectedData.contains("b")
+    }
+    assert(!collectedData.contains("a"))
+  }
+
+  // Test to verify the offset ranges can be recovered from the checkpoints
+  test("offset recovery") {
+    val topic = "recovery"
+    kafkaTestUtils.createTopic(topic)
+    testDir = Utils.createTempDir()
+
+    val kafkaParams = Map(
+      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+      "auto.offset.reset" -> "smallest"
+    )
+
+    // Send data to Kafka and wait for it to be received
+    def sendDataAndWaitForReceive(data: Seq[Int]) {
+      val strings = data.map { _.toString}
+      kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains })
+      }
+    }
+
+    // Setup the streaming context
+    ssc = new StreamingContext(sparkConf, Milliseconds(100))
+    val kafkaStream = withClue("Error creating direct stream") {
+      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+        ssc, kafkaParams, Set(topic))
+    }
+    val keyedStream = kafkaStream.map { v => "key" -> v._2.toInt }
+    val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
+      Some(values.sum + state.getOrElse(0))
+    }
+    ssc.checkpoint(testDir.getAbsolutePath)
+
+    // This is to collect the raw data received from Kafka
+    kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
+      val data = rdd.map { _._2 }.collect()
+      DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))
+    }
+
+    // This is ensure all the data is eventually receiving only once
+    stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
+      rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = x._2 }
+    }
+    ssc.start()
+
+    // Send some data and wait for them to be received
+    for (i <- (1 to 10).grouped(4)) {
+      sendDataAndWaitForReceive(i)
+    }
+
+    ssc.stop()
+
+    // Verify that offset ranges were generated
+    // Since "offsetRangesAfterStop" will be used to compare with "recoveredOffsetRanges", we should
+    // collect offset ranges after stopping. Otherwise, because new RDDs keep being generated before
+    // stopping, we may not be able to get the latest RDDs, then "recoveredOffsetRanges" will
+    // contain something not in "offsetRangesAfterStop".
+    val offsetRangesAfterStop = getOffsetRanges(kafkaStream)
+    assert(offsetRangesAfterStop.size >= 1, "No offset ranges generated")
+    assert(
+      offsetRangesAfterStop.head._2.forall { _.fromOffset === 0 },
+      "starting offset not zero"
+    )
+
+    logInfo("====== RESTARTING ========")
+
+    // Recover context from checkpoints
+    ssc = new StreamingContext(testDir.getAbsolutePath)
+    val recoveredStream = ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]]
+
+    // Verify offset ranges have been recovered
+    val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
+    assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
+    val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) }
+    assert(
+      recoveredOffsetRanges.forall { or =>
+        earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
+      },
+      "Recovered ranges are not the same as the ones generated\n" +
+        s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
+        s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets"
+    )
+    // Restart context, give more data and verify the total at the end
+    // If the total is write that means each records has been received only once
+    ssc.start()
+    sendDataAndWaitForReceive(11 to 20)
+    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      assert(DirectKafkaStreamSuite.total === (1 to 20).sum)
+    }
+    ssc.stop()
+  }
+
+  test("Direct Kafka stream report input information") {
+    val topic = "report-test"
+    val data = Map("a" -> 7, "b" -> 9)
+    kafkaTestUtils.createTopic(topic)
+    kafkaTestUtils.sendMessages(topic, data)
+
+    val totalSent = data.values.sum
+    val kafkaParams = Map(
+      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+      "auto.offset.reset" -> "smallest"
+    )
+
+    import DirectKafkaStreamSuite._
+    ssc = new StreamingContext(sparkConf, Milliseconds(200))
+    val collector = new InputInfoCollector
+    ssc.addStreamingListener(collector)
+
+    val stream = withClue("Error creating direct stream") {
+      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
+        ssc, kafkaParams, Set(topic))
+    }
+
+    val allReceived = new ConcurrentLinkedQueue[(String, String)]
+
+    stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
+    ssc.start()
+    eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
+      assert(allReceived.size === totalSent,
+        "didn't get expected number of messages, messages:\n" +
+          allReceived.asScala.mkString("\n"))
+
+      // Calculate all the record number collected in the StreamingListener.
+      assert(collector.numRecordsSubmitted.get() === totalSent)
+      assert(collector.numRecordsStarted.get() === totalSent)
+      assert(collector.numRecordsCompleted.get() === totalSent)
+    }
+    ssc.stop()
+  }
+
+  test("maxMessagesPerPartition with backpressure disabled") {
+    val topic = "maxMessagesPerPartition"
+    val kafkaStream = getDirectKafkaStream(topic, None)
+
+    val input = Map(TopicAndPartition(topic, 0) -> 50L, TopicAndPartition(topic, 1) -> 50L)
+    assert(kafkaStream.maxMessagesPerPartition(input).get ==
+      Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L))
+  }
+
+  test("maxMessagesPerPartition with no lag") {
+    val topic = "maxMessagesPerPartition"
+    val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 100))
+    val kafkaStream = getDirectKafkaStream(topic, rateController)
+
+    val input = Map(TopicAndPartition(topic, 0) -> 0L, TopicAndPartition(topic, 1) -> 0L)
+    assert(kafkaStream.maxMessagesPerPartition(input).isEmpty)
+  }
+
+  test("maxMessagesPerPartition respects max rate") {
+    val topic = "maxMessagesPerPartition"
+    val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 1000))
+    val kafkaStream = getDirectKafkaStream(topic, rateController)
+
+    val input = Map(TopicAndPartition(topic, 0) -> 1000L, TopicAndPartition(topic, 1) -> 1000L)
+    assert(kafkaStream.maxMessagesPerPartition(input).get ==
+      Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L))
+  }
+
+  test("using rate controller") {
+    val topic = "backpressure"
+    val topicPartitions = Set(TopicAndPartition(topic, 0), TopicAndPartition(topic, 1))
+    kafkaTestUtils.createTopic(topic, 2)
+    val kafkaParams = Map(
+      "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+      "auto.offset.reset" -> "smallest"
+    )
+
+    val batchIntervalMilliseconds = 100
+    val estimator = new ConstantEstimator(100)
+    val messages = Map("foo" -> 200)
+    kafkaTestUtils.sendMessages(topic, messages)
+
+    val sparkConf = new SparkConf()
+      // Safe, even with streaming, because we're using the direct API.
+      // Using 1 core is useful to make the test more predictable.
+      .setMaster("local[1]")
+      .setAppName(this.getClass.getSimpleName)
+      .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+    // Setup the streaming context
+    ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
+
+    val kafkaStream = withClue("Error creating direct stream") {
+      val kc = new KafkaCluster(kafkaParams)
+      val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
+      val m = kc.getEarliestLeaderOffsets(topicPartitions)
+        .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset))
+
+      new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
+          ssc, kafkaParams, m, messageHandler) {
+        override protected[streaming] val rateController =
+          Some(new DirectKafkaRateController(id, estimator))
+      }
+    }
+
+    val collectedData = new ConcurrentLinkedQueue[Array[String]]()
+
+    // Used for assertion failure messages.
+    def dataToString: String =
+      collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
+
+    // This is to collect the raw data received from Kafka
+    kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
+      val data = rdd.map { _._2 }.collect()
+      collectedData.add(data)
+    }
+
+    ssc.start()
+
+    // Try different rate limits.
+    // Wait for arrays of data to appear matching the rate.
+    Seq(100, 50, 20).foreach { rate =>
+      collectedData.clear()       // Empty this buffer on each pass.
+      estimator.updateRate(rate)  // Set a new rate.
+      // Expect blocks of data equal to "rate", scaled by the interval length in secs.
+      val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
+      eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
+        // Assert that rate estimator values are used to determine maxMessagesPerPartition.
+        // Funky "-" in message makes the complete assertion message read better.
+        assert(collectedData.asScala.exists(_.size == expectedSize),
+          s" - No arrays of size $expectedSize for rate $rate found in $dataToString")
+      }
+    }
+
+    ssc.stop()
+  }
+
+  /** Get the generated offset ranges from the DirectKafkaStream */
+  private def getOffsetRanges[K, V](
+      kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = {
+    kafkaStream.generatedRDDs.mapValues { rdd =>
+      rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges
+    }.toSeq.sortBy { _._1 }
+  }
+
+  private def getDirectKafkaStream(topic: String, mockRateController: Option[RateController]) = {
+    val batchIntervalMilliseconds = 100
+
+    val sparkConf = new SparkConf()
+      .setMaster("local[1]")
+      .setAppName(this.getClass.getSimpleName)
+      .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+    // Setup the streaming context
+    ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
+
+    val earliestOffsets = Map(TopicAndPartition(topic, 0) -> 0L, TopicAndPartition(topic, 1) -> 0L)
+    val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
+    new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
+      ssc, Map[String, String](), earliestOffsets, messageHandler) {
+      override protected[streaming] val rateController = mockRateController
+    }
+  }
+}
+
+object DirectKafkaStreamSuite {
+  val collectedData = new ConcurrentLinkedQueue[String]()
+  @volatile var total = -1L
+
+  class InputInfoCollector extends StreamingListener {
+    val numRecordsSubmitted = new AtomicLong(0L)
+    val numRecordsStarted = new AtomicLong(0L)
+    val numRecordsCompleted = new AtomicLong(0L)
+
+    override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
+      numRecordsSubmitted.addAndGet(batchSubmitted.batchInfo.numRecords)
+    }
+
+    override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
+      numRecordsStarted.addAndGet(batchStarted.batchInfo.numRecords)
+    }
+
+    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
+      numRecordsCompleted.addAndGet(batchCompleted.batchInfo.numRecords)
+    }
+  }
+}
+
+private[streaming] class ConstantEstimator(@volatile private var rate: Long)
+  extends RateEstimator {
+
+  def updateRate(newRate: Long): Unit = {
+    rate = newRate
+  }
+
+  def compute(
+      time: Long,
+      elements: Long,
+      processingDelay: Long,
+      schedulingDelay: Long): Option[Double] = Some(rate)
+}
+
+private[streaming] class ConstantRateController(id: Int, estimator: RateEstimator, rate: Long)
+  extends RateController(id, estimator) {
+  override def publish(rate: Long): Unit = ()
+  override def getLatestRate(): Long = rate
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
new file mode 100644
index 0000000..d66830c
--- /dev/null
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import kafka.common.TopicAndPartition
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.SparkFunSuite
+
+class KafkaClusterSuite extends SparkFunSuite with BeforeAndAfterAll {
+  private val topic = "kcsuitetopic" + Random.nextInt(10000)
+  private val topicAndPartition = TopicAndPartition(topic, 0)
+  private var kc: KafkaCluster = null
+
+  private var kafkaTestUtils: KafkaTestUtils = _
+
+  override def beforeAll() {
+    kafkaTestUtils = new KafkaTestUtils
+    kafkaTestUtils.setup()
+
+    kafkaTestUtils.createTopic(topic)
+    kafkaTestUtils.sendMessages(topic, Map("a" -> 1))
+    kc = new KafkaCluster(Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress))
+  }
+
+  override def afterAll() {
+    if (kafkaTestUtils != null) {
+      kafkaTestUtils.teardown()
+      kafkaTestUtils = null
+    }
+  }
+
+  test("metadata apis") {
+    val leader = kc.findLeaders(Set(topicAndPartition)).right.get(topicAndPartition)
+    val leaderAddress = s"${leader._1}:${leader._2}"
+    assert(leaderAddress === kafkaTestUtils.brokerAddress, "didn't get leader")
+
+    val parts = kc.getPartitions(Set(topic)).right.get
+    assert(parts(topicAndPartition), "didn't get partitions")
+
+    val err = kc.getPartitions(Set(topic + "BAD"))
+    assert(err.isLeft, "getPartitions for a nonexistant topic should be an error")
+  }
+
+  test("leader offset apis") {
+    val earliest = kc.getEarliestLeaderOffsets(Set(topicAndPartition)).right.get
+    assert(earliest(topicAndPartition).offset === 0, "didn't get earliest")
+
+    val latest = kc.getLatestLeaderOffsets(Set(topicAndPartition)).right.get
+    assert(latest(topicAndPartition).offset === 1, "didn't get latest")
+  }
+
+  test("consumer offset apis") {
+    val group = "kcsuitegroup" + Random.nextInt(10000)
+
+    val offset = Random.nextInt(10000)
+
+    val set = kc.setConsumerOffsets(group, Map(topicAndPartition -> offset))
+    assert(set.isRight, "didn't set consumer offsets")
+
+    val get = kc.getConsumerOffsets(group, Set(topicAndPartition)).right.get
+    assert(get(topicAndPartition) === offset, "didn't get consumer offsets")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
new file mode 100644
index 0000000..5e539c1
--- /dev/null
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.util.Random
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.StringDecoder
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark._
+
+class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+  private var kafkaTestUtils: KafkaTestUtils = _
+
+  private val sparkConf = new SparkConf().setMaster("local[4]")
+    .setAppName(this.getClass.getSimpleName)
+  private var sc: SparkContext = _
+
+  override def beforeAll {
+    sc = new SparkContext(sparkConf)
+    kafkaTestUtils = new KafkaTestUtils
+    kafkaTestUtils.setup()
+  }
+
+  override def afterAll {
+    if (sc != null) {
+      sc.stop
+      sc = null
+    }
+
+    if (kafkaTestUtils != null) {
+      kafkaTestUtils.teardown()
+      kafkaTestUtils = null
+    }
+  }
+
+  test("basic usage") {
+    val topic = s"topicbasic-${Random.nextInt}"
+    kafkaTestUtils.createTopic(topic)
+    val messages = Array("the", "quick", "brown", "fox")
+    kafkaTestUtils.sendMessages(topic, messages)
+
+    val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+      "group.id" -> s"test-consumer-${Random.nextInt}")
+
+    val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
+
+    val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
+      sc, kafkaParams, offsetRanges)
+
+    val received = rdd.map(_._2).collect.toSet
+    assert(received === messages.toSet)
+
+    // size-related method optimizations return sane results
+    assert(rdd.count === messages.size)
+    assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
+    assert(!rdd.isEmpty)
+    assert(rdd.take(1).size === 1)
+    assert(rdd.take(1).head._2 === messages.head)
+    assert(rdd.take(messages.size + 10).size === messages.size)
+
+    val emptyRdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
+      sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)))
+
+    assert(emptyRdd.isEmpty)
+
+    // invalid offset ranges throw exceptions
+    val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1))
+    intercept[SparkException] {
+      KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
+        sc, kafkaParams, badRanges)
+    }
+  }
+
+  test("iterator boundary conditions") {
+    // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
+    val topic = s"topicboundary-${Random.nextInt}"
+    val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+    kafkaTestUtils.createTopic(topic)
+
+    val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+      "group.id" -> s"test-consumer-${Random.nextInt}")
+
+    val kc = new KafkaCluster(kafkaParams)
+
+    // this is the "lots of messages" case
+    kafkaTestUtils.sendMessages(topic, sent)
+    val sentCount = sent.values.sum
+
+    // rdd defined from leaders after sending messages, should get the number sent
+    val rdd = getRdd(kc, Set(topic))
+
+    assert(rdd.isDefined)
+
+    val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges
+    val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
+
+    assert(rangeCount === sentCount, "offset range didn't include all sent messages")
+    assert(rdd.get.count === sentCount, "didn't get all sent messages")
+
+    val rangesMap = ranges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap
+
+    // make sure consumer offsets are committed before the next getRdd call
+    kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold(
+      err => throw new Exception(err.mkString("\n")),
+      _ => ()
+    )
+
+    // this is the "0 messages" case
+    val rdd2 = getRdd(kc, Set(topic))
+    // shouldn't get anything, since message is sent after rdd was defined
+    val sentOnlyOne = Map("d" -> 1)
+
+    kafkaTestUtils.sendMessages(topic, sentOnlyOne)
+
+    assert(rdd2.isDefined)
+    assert(rdd2.get.count === 0, "got messages when there shouldn't be any")
+
+    // this is the "exactly 1 message" case, namely the single message from sentOnlyOne above
+    val rdd3 = getRdd(kc, Set(topic))
+    // send lots of messages after rdd was defined, they shouldn't show up
+    kafkaTestUtils.sendMessages(topic, Map("extra" -> 22))
+
+    assert(rdd3.isDefined)
+    assert(rdd3.get.count === sentOnlyOne.values.sum, "didn't get exactly one message")
+
+  }
+
+  // get an rdd from the committed consumer offsets until the latest leader offsets,
+  private def getRdd(kc: KafkaCluster, topics: Set[String]) = {
+    val groupId = kc.kafkaParams("group.id")
+    def consumerOffsets(topicPartitions: Set[TopicAndPartition]) = {
+      kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
+        kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs =>
+          offs.map(kv => kv._1 -> kv._2.offset)
+        }
+      )
+    }
+    kc.getPartitions(topics).right.toOption.flatMap { topicPartitions =>
+      consumerOffsets(topicPartitions).flatMap { from =>
+        kc.getLatestLeaderOffsets(topicPartitions).right.toOption.map { until =>
+          val offsetRanges = from.map { case (tp: TopicAndPartition, fromOffset: Long) =>
+              OffsetRange(tp.topic, tp.partition, fromOffset, until(tp).offset)
+          }.toArray
+
+          val leaders = until.map { case (tp: TopicAndPartition, lo: KafkaCluster.LeaderOffset) =>
+              tp -> Broker(lo.host, lo.port)
+          }.toMap
+
+          KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String](
+            sc, kc.kafkaParams, offsetRanges, leaders,
+            (mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} ${mmd.message}")
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
new file mode 100644
index 0000000..6a35ac1
--- /dev/null
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+import kafka.serializer.StringDecoder
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
+class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterAll {
+  private var ssc: StreamingContext = _
+  private var kafkaTestUtils: KafkaTestUtils = _
+
+  override def beforeAll(): Unit = {
+    kafkaTestUtils = new KafkaTestUtils
+    kafkaTestUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+    if (ssc != null) {
+      ssc.stop()
+      ssc = null
+    }
+
+    if (kafkaTestUtils != null) {
+      kafkaTestUtils.teardown()
+      kafkaTestUtils = null
+    }
+  }
+
+  test("Kafka input stream") {
+    val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
+    ssc = new StreamingContext(sparkConf, Milliseconds(500))
+    val topic = "topic1"
+    val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+    kafkaTestUtils.createTopic(topic)
+    kafkaTestUtils.sendMessages(topic, sent)
+
+    val kafkaParams = Map("zookeeper.connect" -> kafkaTestUtils.zkAddress,
+      "group.id" -> s"test-consumer-${Random.nextInt(10000)}",
+      "auto.offset.reset" -> "smallest")
+
+    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+      ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
+    val result = new mutable.HashMap[String, Long]()
+    stream.map(_._2).countByValue().foreachRDD { r =>
+      r.collect().foreach { kv =>
+        result.synchronized {
+          val count = result.getOrElseUpdate(kv._1, 0) + kv._2
+          result.put(kv._1, count)
+        }
+      }
+    }
+
+    ssc.start()
+
+    eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+      assert(result.synchronized { sent === result })
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
new file mode 100644
index 0000000..7b9aee3
--- /dev/null
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+import kafka.serializer.StringDecoder
+import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+import org.apache.spark.util.Utils
+
+class ReliableKafkaStreamSuite extends SparkFunSuite
+    with BeforeAndAfterAll with BeforeAndAfter with Eventually {
+
+  private val sparkConf = new SparkConf()
+    .setMaster("local[4]")
+    .setAppName(this.getClass.getSimpleName)
+    .set("spark.streaming.receiver.writeAheadLog.enable", "true")
+  private val data = Map("a" -> 10, "b" -> 10, "c" -> 10)
+
+  private var kafkaTestUtils: KafkaTestUtils = _
+
+  private var groupId: String = _
+  private var kafkaParams: Map[String, String] = _
+  private var ssc: StreamingContext = _
+  private var tempDirectory: File = null
+
+  override def beforeAll(): Unit = {
+    kafkaTestUtils = new KafkaTestUtils
+    kafkaTestUtils.setup()
+
+    groupId = s"test-consumer-${Random.nextInt(10000)}"
+    kafkaParams = Map(
+      "zookeeper.connect" -> kafkaTestUtils.zkAddress,
+      "group.id" -> groupId,
+      "auto.offset.reset" -> "smallest"
+    )
+
+    tempDirectory = Utils.createTempDir()
+  }
+
+  override def afterAll(): Unit = {
+    Utils.deleteRecursively(tempDirectory)
+
+    if (kafkaTestUtils != null) {
+      kafkaTestUtils.teardown()
+      kafkaTestUtils = null
+    }
+  }
+
+  before {
+    ssc = new StreamingContext(sparkConf, Milliseconds(500))
+    ssc.checkpoint(tempDirectory.getAbsolutePath)
+  }
+
+  after {
+    if (ssc != null) {
+      ssc.stop()
+      ssc = null
+    }
+  }
+
+  test("Reliable Kafka input stream with single topic") {
+    val topic = "test-topic"
+    kafkaTestUtils.createTopic(topic)
+    kafkaTestUtils.sendMessages(topic, data)
+
+    // Verify whether the offset of this group/topic/partition is 0 before starting.
+    assert(getCommitOffset(groupId, topic, 0) === None)
+
+    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+      ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
+    val result = new mutable.HashMap[String, Long]()
+    stream.map { case (k, v) => v }.foreachRDD { r =>
+        val ret = r.collect()
+        ret.foreach { v =>
+          val count = result.getOrElseUpdate(v, 0) + 1
+          result.put(v, count)
+        }
+      }
+    ssc.start()
+
+    eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
+      // A basic process verification for ReliableKafkaReceiver.
+      // Verify whether received message number is equal to the sent message number.
+      assert(data.size === result.size)
+      // Verify whether each message is the same as the data to be verified.
+      data.keys.foreach { k => assert(data(k) === result(k).toInt) }
+      // Verify the offset number whether it is equal to the total message number.
+      assert(getCommitOffset(groupId, topic, 0) === Some(29L))
+    }
+  }
+
+  test("Reliable Kafka input stream with multiple topics") {
+    val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
+    topics.foreach { case (t, _) =>
+      kafkaTestUtils.createTopic(t)
+      kafkaTestUtils.sendMessages(t, data)
+    }
+
+    // Before started, verify all the group/topic/partition offsets are 0.
+    topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === None) }
+
+    // Consuming all the data sent to the broker which will potential commit the offsets internally.
+    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+      ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY)
+    stream.foreachRDD(_ => Unit)
+    ssc.start()
+
+    eventually(timeout(20000 milliseconds), interval(100 milliseconds)) {
+      // Verify the offset for each group/topic to see whether they are equal to the expected one.
+      topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) }
+    }
+  }
+
+
+  /** Getting partition offset from Zookeeper. */
+  private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = {
+    val topicDirs = new ZKGroupTopicDirs(groupId, topic)
+    val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
+    ZkUtils.readDataMaybeNull(kafkaTestUtils.zookeeperClient, zkPath)._1.map(_.toLong)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml
deleted file mode 100644
index e1b5a7e..0000000
--- a/external/kafka-assembly/pom.xml
+++ /dev/null
@@ -1,176 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent_2.11</artifactId>
-    <version>2.0.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <groupId>org.apache.spark</groupId>
-  <artifactId>spark-streaming-kafka-assembly_2.11</artifactId>
-  <packaging>jar</packaging>
-  <name>Spark Project External Kafka Assembly</name>
-  <url>http://spark.apache.org/</url>
-
-  <properties>
-    <sbt.project.name>streaming-kafka-assembly</sbt.project.name>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <!--
-      Demote already included in the Spark assembly.
-    -->
-    <dependency>
-      <groupId>commons-codec</groupId>
-      <artifactId>commons-codec</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>net.jpountz.lz4</groupId>
-      <artifactId>lz4</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-mapred</artifactId>
-      <classifier>${avro.mapred.classifier}</classifier>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.curator</groupId>
-      <artifactId>curator-recipes</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>net.java.dev.jets3t</groupId>
-      <artifactId>jets3t</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.xerial.snappy</groupId>
-      <artifactId>snappy-java</artifactId>
-      <scope>provided</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-  <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
-  <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-  <plugins>
-    <plugin>
-      <groupId>org.apache.maven.plugins</groupId>
-      <artifactId>maven-shade-plugin</artifactId>
-      <configuration>
-        <shadedArtifactAttached>false</shadedArtifactAttached>
-        <artifactSet>
-          <includes>
-            <include>*:*</include>
-          </includes>
-        </artifactSet>
-        <filters>
-          <filter>
-            <artifact>*:*</artifact>
-            <excludes>
-              <exclude>META-INF/*.SF</exclude>
-              <exclude>META-INF/*.DSA</exclude>
-              <exclude>META-INF/*.RSA</exclude>
-            </excludes>
-          </filter>
-        </filters>
-      </configuration>
-      <executions>
-        <execution>
-          <phase>package</phase>
-          <goals>
-            <goal>shade</goal>
-          </goals>
-          <configuration>
-            <transformers>
-              <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
-              <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
-                <resource>reference.conf</resource>
-              </transformer>
-              <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
-                <resource>log4j.properties</resource>
-              </transformer>
-              <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
-              <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
-            </transformers>
-          </configuration>
-        </execution>
-      </executions>
-    </plugin>
-  </plugins>
-</build>
-</project>
-

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
deleted file mode 100644
index 68d52e9..0000000
--- a/external/kafka/pom.xml
+++ /dev/null
@@ -1,98 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent_2.11</artifactId>
-    <version>2.0.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <groupId>org.apache.spark</groupId>
-  <artifactId>spark-streaming-kafka_2.11</artifactId>
-  <properties>
-    <sbt.project.name>streaming-kafka</sbt.project.name>
-  </properties>
-  <packaging>jar</packaging>
-  <name>Spark Project External Kafka</name>
-  <url>http://spark.apache.org/</url>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_${scala.binary.version}</artifactId>
-      <version>0.8.2.1</version>
-      <exclusions>
-        <exclusion>
-          <groupId>com.sun.jmx</groupId>
-          <artifactId>jmxri</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.sun.jdmk</groupId>
-          <artifactId>jmxtools</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>net.sf.jopt-simple</groupId>
-          <artifactId>jopt-simple</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-simple</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.zookeeper</groupId>
-          <artifactId>zookeeper</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>net.sf.jopt-simple</groupId>
-      <artifactId>jopt-simple</artifactId>
-      <version>3.2</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.scalacheck</groupId>
-      <artifactId>scalacheck_${scala.binary.version}</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
-    </dependency>
-  </dependencies>
-  <build>
-    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
-    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-  </build>
-</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
deleted file mode 100644
index 9159051..0000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import org.apache.spark.annotation.Experimental
-
-/**
- * Represents the host and port info for a Kafka broker.
- * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID.
- */
-final class Broker private(
-    /** Broker's hostname */
-    val host: String,
-    /** Broker's port */
-    val port: Int) extends Serializable {
-  override def equals(obj: Any): Boolean = obj match {
-    case that: Broker =>
-      this.host == that.host &&
-      this.port == that.port
-    case _ => false
-  }
-
-  override def hashCode: Int = {
-    41 * (41 + host.hashCode) + port
-  }
-
-  override def toString(): String = {
-    s"Broker($host, $port)"
-  }
-}
-
-/**
- * :: Experimental ::
- * Companion object that provides methods to create instances of [[Broker]].
- */
-@Experimental
-object Broker {
-  def create(host: String, port: Int): Broker =
-    new Broker(host, port)
-
-  def apply(host: String, port: Int): Broker =
-    new Broker(host, port)
-
-  def unapply(broker: Broker): Option[(String, Int)] = {
-    if (broker == null) {
-      None
-    } else {
-      Some((broker.host, broker.port))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
deleted file mode 100644
index fb58ed7..0000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.annotation.tailrec
-import scala.collection.mutable
-import scala.reflect.ClassTag
-
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
-import kafka.serializer.Decoder
-
-import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
-import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
-import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
-import org.apache.spark.streaming.scheduler.rate.RateEstimator
-
-/**
- *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
- * each given Kafka topic/partition corresponds to an RDD partition.
- * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
- *  of messages
- * per second that each '''partition''' will accept.
- * Starting offsets are specified in advance,
- * and this DStream is not responsible for committing offsets,
- * so that you can control exactly-once semantics.
- * For an easy interface to Kafka-managed offsets,
- *  see {@link org.apache.spark.streaming.kafka.KafkaCluster}
- * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
- * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
- *  starting point of the stream
- * @param messageHandler function for translating each message into the desired type
- */
-private[streaming]
-class DirectKafkaInputDStream[
-  K: ClassTag,
-  V: ClassTag,
-  U <: Decoder[K]: ClassTag,
-  T <: Decoder[V]: ClassTag,
-  R: ClassTag](
-    _ssc: StreamingContext,
-    val kafkaParams: Map[String, String],
-    val fromOffsets: Map[TopicAndPartition, Long],
-    messageHandler: MessageAndMetadata[K, V] => R
-  ) extends InputDStream[R](_ssc) with Logging {
-  val maxRetries = context.sparkContext.getConf.getInt(
-    "spark.streaming.kafka.maxRetries", 1)
-
-  // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
-  private[streaming] override def name: String = s"Kafka direct stream [$id]"
-
-  protected[streaming] override val checkpointData =
-    new DirectKafkaInputDStreamCheckpointData
-
-
-  /**
-   * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
-   */
-  override protected[streaming] val rateController: Option[RateController] = {
-    if (RateController.isBackPressureEnabled(ssc.conf)) {
-      Some(new DirectKafkaRateController(id,
-        RateEstimator.create(ssc.conf, context.graph.batchDuration)))
-    } else {
-      None
-    }
-  }
-
-  protected val kc = new KafkaCluster(kafkaParams)
-
-  private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
-      "spark.streaming.kafka.maxRatePerPartition", 0)
-
-  protected[streaming] def maxMessagesPerPartition(
-      offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = {
-    val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
-
-    // calculate a per-partition rate limit based on current lag
-    val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
-      case Some(rate) =>
-        val lagPerPartition = offsets.map { case (tp, offset) =>
-          tp -> Math.max(offset - currentOffsets(tp), 0)
-        }
-        val totalLag = lagPerPartition.values.sum
-
-        lagPerPartition.map { case (tp, lag) =>
-          val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
-          tp -> (if (maxRateLimitPerPartition > 0) {
-            Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
-        }
-      case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
-    }
-
-    if (effectiveRateLimitPerPartition.values.sum > 0) {
-      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
-      Some(effectiveRateLimitPerPartition.map {
-        case (tp, limit) => tp -> (secsPerBatch * limit).toLong
-      })
-    } else {
-      None
-    }
-  }
-
-  protected var currentOffsets = fromOffsets
-
-  @tailrec
-  protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
-    val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
-    // Either.fold would confuse @tailrec, do it manually
-    if (o.isLeft) {
-      val err = o.left.get.toString
-      if (retries <= 0) {
-        throw new SparkException(err)
-      } else {
-        log.error(err)
-        Thread.sleep(kc.config.refreshLeaderBackoffMs)
-        latestLeaderOffsets(retries - 1)
-      }
-    } else {
-      o.right.get
-    }
-  }
-
-  // limits the maximum number of messages per partition
-  protected def clamp(
-    leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
-    val offsets = leaderOffsets.mapValues(lo => lo.offset)
-
-    maxMessagesPerPartition(offsets).map { mmp =>
-      mmp.map { case (tp, messages) =>
-        val lo = leaderOffsets(tp)
-        tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset))
-      }
-    }.getOrElse(leaderOffsets)
-  }
-
-  override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
-    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
-    val rdd = KafkaRDD[K, V, U, T, R](
-      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
-
-    // Report the record number and metadata of this batch interval to InputInfoTracker.
-    val offsetRanges = currentOffsets.map { case (tp, fo) =>
-      val uo = untilOffsets(tp)
-      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
-    }
-    val description = offsetRanges.filter { offsetRange =>
-      // Don't display empty ranges.
-      offsetRange.fromOffset != offsetRange.untilOffset
-    }.map { offsetRange =>
-      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
-        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
-    }.mkString("\n")
-    // Copy offsetRanges to immutable.List to prevent from being modified by the user
-    val metadata = Map(
-      "offsets" -> offsetRanges.toList,
-      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
-    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
-    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
-
-    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
-    Some(rdd)
-  }
-
-  override def start(): Unit = {
-  }
-
-  def stop(): Unit = {
-  }
-
-  private[streaming]
-  class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
-    def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
-      data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
-    }
-
-    override def update(time: Time) {
-      batchForTime.clear()
-      generatedRDDs.foreach { kv =>
-        val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
-        batchForTime += kv._1 -> a
-      }
-    }
-
-    override def cleanup(time: Time) { }
-
-    override def restore() {
-      // this is assuming that the topics don't change during execution, which is true currently
-      val topics = fromOffsets.keySet
-      val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))
-
-      batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
-         logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
-         generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
-           context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
-      }
-    }
-  }
-
-  /**
-   * A RateController to retrieve the rate from RateEstimator.
-   */
-  private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator)
-    extends RateController(id, estimator) {
-    override def publish(rate: Long): Unit = ()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
deleted file mode 100644
index 726b5d8..0000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Random
-import scala.util.control.NonFatal
-
-import kafka.api._
-import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
-import kafka.consumer.{ConsumerConfig, SimpleConsumer}
-
-import org.apache.spark.SparkException
-import org.apache.spark.annotation.DeveloperApi
-
-/**
- * :: DeveloperApi ::
- * Convenience methods for interacting with a Kafka cluster.
- * See <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">
- * A Guide To The Kafka Protocol</a> for more details on individual api calls.
- * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
- */
-@DeveloperApi
-class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
-  import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}
-
-  // ConsumerConfig isn't serializable
-  @transient private var _config: SimpleConsumerConfig = null
-
-  def config: SimpleConsumerConfig = this.synchronized {
-    if (_config == null) {
-      _config = SimpleConsumerConfig(kafkaParams)
-    }
-    _config
-  }
-
-  def connect(host: String, port: Int): SimpleConsumer =
-    new SimpleConsumer(host, port, config.socketTimeoutMs,
-      config.socketReceiveBufferBytes, config.clientId)
-
-  def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] =
-    findLeader(topic, partition).right.map(hp => connect(hp._1, hp._2))
-
-  // Metadata api
-  // scalastyle:off
-  // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
-  // scalastyle:on
-
-  def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = {
-    val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
-      0, config.clientId, Seq(topic))
-    val errs = new Err
-    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
-      val resp: TopicMetadataResponse = consumer.send(req)
-      resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata =>
-        tm.partitionsMetadata.find(_.partitionId == partition)
-      }.foreach { pm: PartitionMetadata =>
-        pm.leader.foreach { leader =>
-          return Right((leader.host, leader.port))
-        }
-      }
-    }
-    Left(errs)
-  }
-
-  def findLeaders(
-      topicAndPartitions: Set[TopicAndPartition]
-    ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
-    val topics = topicAndPartitions.map(_.topic)
-    val response = getPartitionMetadata(topics).right
-    val answer = response.flatMap { tms: Set[TopicMetadata] =>
-      val leaderMap = tms.flatMap { tm: TopicMetadata =>
-        tm.partitionsMetadata.flatMap { pm: PartitionMetadata =>
-          val tp = TopicAndPartition(tm.topic, pm.partitionId)
-          if (topicAndPartitions(tp)) {
-            pm.leader.map { l =>
-              tp -> (l.host -> l.port)
-            }
-          } else {
-            None
-          }
-        }
-      }.toMap
-
-      if (leaderMap.keys.size == topicAndPartitions.size) {
-        Right(leaderMap)
-      } else {
-        val missing = topicAndPartitions.diff(leaderMap.keySet)
-        val err = new Err
-        err.append(new SparkException(s"Couldn't find leaders for ${missing}"))
-        Left(err)
-      }
-    }
-    answer
-  }
-
-  def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] = {
-    getPartitionMetadata(topics).right.map { r =>
-      r.flatMap { tm: TopicMetadata =>
-        tm.partitionsMetadata.map { pm: PartitionMetadata =>
-          TopicAndPartition(tm.topic, pm.partitionId)
-        }
-      }
-    }
-  }
-
-  def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = {
-    val req = TopicMetadataRequest(
-      TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq)
-    val errs = new Err
-    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
-      val resp: TopicMetadataResponse = consumer.send(req)
-      val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError)
-
-      if (respErrs.isEmpty) {
-        return Right(resp.topicsMetadata.toSet)
-      } else {
-        respErrs.foreach { m =>
-          val cause = ErrorMapping.exceptionFor(m.errorCode)
-          val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?"
-          errs.append(new SparkException(msg, cause))
-        }
-      }
-    }
-    Left(errs)
-  }
-
-  // Leader offset api
-  // scalastyle:off
-  // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
-  // scalastyle:on
-
-  def getLatestLeaderOffsets(
-      topicAndPartitions: Set[TopicAndPartition]
-    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
-    getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)
-
-  def getEarliestLeaderOffsets(
-      topicAndPartitions: Set[TopicAndPartition]
-    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
-    getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime)
-
-  def getLeaderOffsets(
-      topicAndPartitions: Set[TopicAndPartition],
-      before: Long
-    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = {
-    getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
-      r.map { kv =>
-        // mapValues isn't serializable, see SI-7005
-        kv._1 -> kv._2.head
-      }
-    }
-  }
-
-  private def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] =
-    m.groupBy(_._2).map { kv =>
-      kv._1 -> kv._2.keys.toSeq
-    }
-
-  def getLeaderOffsets(
-      topicAndPartitions: Set[TopicAndPartition],
-      before: Long,
-      maxNumOffsets: Int
-    ): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
-    findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>
-      val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader)
-      val leaders = leaderToTp.keys
-      var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
-      val errs = new Err
-      withBrokers(leaders, errs) { consumer =>
-        val partitionsToGetOffsets: Seq[TopicAndPartition] =
-          leaderToTp((consumer.host, consumer.port))
-        val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition =>
-          tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
-        }.toMap
-        val req = OffsetRequest(reqMap)
-        val resp = consumer.getOffsetsBefore(req)
-        val respMap = resp.partitionErrorAndOffsets
-        partitionsToGetOffsets.foreach { tp: TopicAndPartition =>
-          respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
-            if (por.error == ErrorMapping.NoError) {
-              if (por.offsets.nonEmpty) {
-                result += tp -> por.offsets.map { off =>
-                  LeaderOffset(consumer.host, consumer.port, off)
-                }
-              } else {
-                errs.append(new SparkException(
-                  s"Empty offsets for ${tp}, is ${before} before log beginning?"))
-              }
-            } else {
-              errs.append(ErrorMapping.exceptionFor(por.error))
-            }
-          }
-        }
-        if (result.keys.size == topicAndPartitions.size) {
-          return Right(result)
-        }
-      }
-      val missing = topicAndPartitions.diff(result.keySet)
-      errs.append(new SparkException(s"Couldn't find leader offsets for ${missing}"))
-      Left(errs)
-    }
-  }
-
-  // Consumer offset api
-  // scalastyle:off
-  // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
-  // scalastyle:on
-
-  // this 0 here indicates api version, in this case the original ZK backed api.
-  private def defaultConsumerApiVersion: Short = 0
-
-  /** Requires Kafka >= 0.8.1.1.  Defaults to the original ZooKeeper backed api version. */
-  def getConsumerOffsets(
-      groupId: String,
-      topicAndPartitions: Set[TopicAndPartition]
-    ): Either[Err, Map[TopicAndPartition, Long]] =
-    getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion)
-
-  def getConsumerOffsets(
-      groupId: String,
-      topicAndPartitions: Set[TopicAndPartition],
-      consumerApiVersion: Short
-    ): Either[Err, Map[TopicAndPartition, Long]] = {
-    getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r =>
-      r.map { kv =>
-        kv._1 -> kv._2.offset
-      }
-    }
-  }
-
-  /** Requires Kafka >= 0.8.1.1.  Defaults to the original ZooKeeper backed api version. */
-  def getConsumerOffsetMetadata(
-      groupId: String,
-      topicAndPartitions: Set[TopicAndPartition]
-    ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] =
-    getConsumerOffsetMetadata(groupId, topicAndPartitions, defaultConsumerApiVersion)
-
-  def getConsumerOffsetMetadata(
-      groupId: String,
-      topicAndPartitions: Set[TopicAndPartition],
-      consumerApiVersion: Short
-    ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
-    var result = Map[TopicAndPartition, OffsetMetadataAndError]()
-    val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion)
-    val errs = new Err
-    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
-      val resp = consumer.fetchOffsets(req)
-      val respMap = resp.requestInfo
-      val needed = topicAndPartitions.diff(result.keySet)
-      needed.foreach { tp: TopicAndPartition =>
-        respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
-          if (ome.error == ErrorMapping.NoError) {
-            result += tp -> ome
-          } else {
-            errs.append(ErrorMapping.exceptionFor(ome.error))
-          }
-        }
-      }
-      if (result.keys.size == topicAndPartitions.size) {
-        return Right(result)
-      }
-    }
-    val missing = topicAndPartitions.diff(result.keySet)
-    errs.append(new SparkException(s"Couldn't find consumer offsets for ${missing}"))
-    Left(errs)
-  }
-
-  /** Requires Kafka >= 0.8.1.1.  Defaults to the original ZooKeeper backed api version. */
-  def setConsumerOffsets(
-      groupId: String,
-      offsets: Map[TopicAndPartition, Long]
-    ): Either[Err, Map[TopicAndPartition, Short]] =
-    setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion)
-
-  def setConsumerOffsets(
-      groupId: String,
-      offsets: Map[TopicAndPartition, Long],
-      consumerApiVersion: Short
-    ): Either[Err, Map[TopicAndPartition, Short]] = {
-    val meta = offsets.map { kv =>
-      kv._1 -> OffsetAndMetadata(kv._2)
-    }
-    setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
-  }
-
-  /** Requires Kafka >= 0.8.1.1.  Defaults to the original ZooKeeper backed api version. */
-  def setConsumerOffsetMetadata(
-      groupId: String,
-      metadata: Map[TopicAndPartition, OffsetAndMetadata]
-    ): Either[Err, Map[TopicAndPartition, Short]] =
-    setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion)
-
-  def setConsumerOffsetMetadata(
-      groupId: String,
-      metadata: Map[TopicAndPartition, OffsetAndMetadata],
-      consumerApiVersion: Short
-    ): Either[Err, Map[TopicAndPartition, Short]] = {
-    var result = Map[TopicAndPartition, Short]()
-    val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
-    val errs = new Err
-    val topicAndPartitions = metadata.keySet
-    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
-      val resp = consumer.commitOffsets(req)
-      val respMap = resp.commitStatus
-      val needed = topicAndPartitions.diff(result.keySet)
-      needed.foreach { tp: TopicAndPartition =>
-        respMap.get(tp).foreach { err: Short =>
-          if (err == ErrorMapping.NoError) {
-            result += tp -> err
-          } else {
-            errs.append(ErrorMapping.exceptionFor(err))
-          }
-        }
-      }
-      if (result.keys.size == topicAndPartitions.size) {
-        return Right(result)
-      }
-    }
-    val missing = topicAndPartitions.diff(result.keySet)
-    errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
-    Left(errs)
-  }
-
-  // Try a call against potentially multiple brokers, accumulating errors
-  private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
-    (fn: SimpleConsumer => Any): Unit = {
-    brokers.foreach { hp =>
-      var consumer: SimpleConsumer = null
-      try {
-        consumer = connect(hp._1, hp._2)
-        fn(consumer)
-      } catch {
-        case NonFatal(e) =>
-          errs.append(e)
-      } finally {
-        if (consumer != null) {
-          consumer.close()
-        }
-      }
-    }
-  }
-}
-
-@DeveloperApi
-object KafkaCluster {
-  type Err = ArrayBuffer[Throwable]
-
-  /** If the result is right, return it, otherwise throw SparkException */
-  def checkErrors[T](result: Either[Err, T]): T = {
-    result.fold(
-      errs => throw new SparkException(errs.mkString("\n")),
-      ok => ok
-    )
-  }
-
-  case class LeaderOffset(host: String, port: Int, offset: Long)
-
-  /**
-   * High-level kafka consumers connect to ZK.  ConsumerConfig assumes this use case.
-   * Simple consumers connect directly to brokers, but need many of the same configs.
-   * This subclass won't warn about missing ZK params, or presence of broker params.
-   */
-  class SimpleConsumerConfig private(brokers: String, originalProps: Properties)
-      extends ConsumerConfig(originalProps) {
-    val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
-      val hpa = hp.split(":")
-      if (hpa.size == 1) {
-        throw new SparkException(s"Broker not in the correct format of <host>:<port> [$brokers]")
-      }
-      (hpa(0), hpa(1).toInt)
-    }
-  }
-
-  object SimpleConsumerConfig {
-    /**
-     * Make a consumer config without requiring group.id or zookeeper.connect,
-     * since communicating with brokers also needs common settings such as timeout
-     */
-    def apply(kafkaParams: Map[String, String]): SimpleConsumerConfig = {
-      // These keys are from other pre-existing kafka configs for specifying brokers, accept either
-      val brokers = kafkaParams.get("metadata.broker.list")
-        .orElse(kafkaParams.get("bootstrap.servers"))
-        .getOrElse(throw new SparkException(
-          "Must specify metadata.broker.list or bootstrap.servers"))
-
-      val props = new Properties()
-      kafkaParams.foreach { case (key, value) =>
-        // prevent warnings on parameters ConsumerConfig doesn't know about
-        if (key != "metadata.broker.list" && key != "bootstrap.servers") {
-          props.put(key, value)
-        }
-      }
-
-      Seq("zookeeper.connect", "group.id").foreach { s =>
-        if (!props.containsKey(s)) {
-          props.setProperty(s, "")
-        }
-      }
-
-      new SimpleConsumerConfig(brokers, props)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
deleted file mode 100644
index 3713bda..0000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.util.Properties
-
-import scala.collection.Map
-import scala.reflect.{classTag, ClassTag}
-
-import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
-import kafka.serializer.Decoder
-import kafka.utils.VerifiableProperties
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.ThreadUtils
-
-/**
- * Input stream that pulls messages from a Kafka Broker.
- *
- * @param kafkaParams Map of kafka configuration parameters.
- *                    See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level.
- */
-private[streaming]
-class KafkaInputDStream[
-  K: ClassTag,
-  V: ClassTag,
-  U <: Decoder[_]: ClassTag,
-  T <: Decoder[_]: ClassTag](
-    _ssc: StreamingContext,
-    kafkaParams: Map[String, String],
-    topics: Map[String, Int],
-    useReliableReceiver: Boolean,
-    storageLevel: StorageLevel
-  ) extends ReceiverInputDStream[(K, V)](_ssc) with Logging {
-
-  def getReceiver(): Receiver[(K, V)] = {
-    if (!useReliableReceiver) {
-      new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
-    } else {
-      new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
-    }
-  }
-}
-
-private[streaming]
-class KafkaReceiver[
-  K: ClassTag,
-  V: ClassTag,
-  U <: Decoder[_]: ClassTag,
-  T <: Decoder[_]: ClassTag](
-    kafkaParams: Map[String, String],
-    topics: Map[String, Int],
-    storageLevel: StorageLevel
-  ) extends Receiver[(K, V)](storageLevel) with Logging {
-
-  // Connection to Kafka
-  var consumerConnector: ConsumerConnector = null
-
-  def onStop() {
-    if (consumerConnector != null) {
-      consumerConnector.shutdown()
-      consumerConnector = null
-    }
-  }
-
-  def onStart() {
-
-    logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
-
-    // Kafka connection properties
-    val props = new Properties()
-    kafkaParams.foreach(param => props.put(param._1, param._2))
-
-    val zkConnect = kafkaParams("zookeeper.connect")
-    // Create the connection to the cluster
-    logInfo("Connecting to Zookeeper: " + zkConnect)
-    val consumerConfig = new ConsumerConfig(props)
-    consumerConnector = Consumer.create(consumerConfig)
-    logInfo("Connected to " + zkConnect)
-
-    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
-      .newInstance(consumerConfig.props)
-      .asInstanceOf[Decoder[K]]
-    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
-      .newInstance(consumerConfig.props)
-      .asInstanceOf[Decoder[V]]
-
-    // Create threads for each topic/message Stream we are listening
-    val topicMessageStreams = consumerConnector.createMessageStreams(
-      topics, keyDecoder, valueDecoder)
-
-    val executorPool =
-      ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
-    try {
-      // Start the messages handler for each partition
-      topicMessageStreams.values.foreach { streams =>
-        streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
-      }
-    } finally {
-      executorPool.shutdown() // Just causes threads to terminate after work is done
-    }
-  }
-
-  // Handles Kafka messages
-  private class MessageHandler(stream: KafkaStream[K, V])
-    extends Runnable {
-    def run() {
-      logInfo("Starting MessageHandler.")
-      try {
-        val streamIterator = stream.iterator()
-        while (streamIterator.hasNext()) {
-          val msgAndMetadata = streamIterator.next()
-          store((msgAndMetadata.key, msgAndMetadata.message))
-        }
-      } catch {
-        case e: Throwable => reportError("Error handling message; exiting", e)
-      }
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message