spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [3/5] spark git commit: [SPARK-13843][STREAMING] Remove streaming-flume, streaming-mqtt, streaming-zeromq, streaming-akka, streaming-twitter to Spark packages
Date Mon, 14 Mar 2016 23:56:14 GMT
http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
deleted file mode 100644
index 74bd016..0000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
-import java.net.InetSocketAddress
-import java.nio.ByteBuffer
-import java.util.concurrent.Executors
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.avro.ipc.NettyServer
-import org.apache.avro.ipc.specific.SpecificResponder
-import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol, Status}
-import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
-import org.jboss.netty.handler.codec.compression._
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.Utils
-
-private[streaming]
-class FlumeInputDStream[T: ClassTag](
-  _ssc: StreamingContext,
-  host: String,
-  port: Int,
-  storageLevel: StorageLevel,
-  enableDecompression: Boolean
-) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
-
-  override def getReceiver(): Receiver[SparkFlumeEvent] = {
-    new FlumeReceiver(host, port, storageLevel, enableDecompression)
-  }
-}
-
-/**
- * A wrapper class for AvroFlumeEvent's with a custom serialization format.
- *
- * This is necessary because AvroFlumeEvent uses inner data structures
- * which are not serializable.
- */
-class SparkFlumeEvent() extends Externalizable {
-  var event: AvroFlumeEvent = new AvroFlumeEvent()
-
-  /* De-serialize from bytes. */
-  def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
-    val bodyLength = in.readInt()
-    val bodyBuff = new Array[Byte](bodyLength)
-    in.readFully(bodyBuff)
-
-    val numHeaders = in.readInt()
-    val headers = new java.util.HashMap[CharSequence, CharSequence]
-
-    for (i <- 0 until numHeaders) {
-      val keyLength = in.readInt()
-      val keyBuff = new Array[Byte](keyLength)
-      in.readFully(keyBuff)
-      val key: String = Utils.deserialize(keyBuff)
-
-      val valLength = in.readInt()
-      val valBuff = new Array[Byte](valLength)
-      in.readFully(valBuff)
-      val value: String = Utils.deserialize(valBuff)
-
-      headers.put(key, value)
-    }
-
-    event.setBody(ByteBuffer.wrap(bodyBuff))
-    event.setHeaders(headers)
-  }
-
-  /* Serialize to bytes. */
-  def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
-    val body = event.getBody
-    out.writeInt(body.remaining())
-    Utils.writeByteBuffer(body, out)
-
-    val numHeaders = event.getHeaders.size()
-    out.writeInt(numHeaders)
-    for ((k, v) <- event.getHeaders.asScala) {
-      val keyBuff = Utils.serialize(k.toString)
-      out.writeInt(keyBuff.length)
-      out.write(keyBuff)
-      val valBuff = Utils.serialize(v.toString)
-      out.writeInt(valBuff.length)
-      out.write(valBuff)
-    }
-  }
-}
-
-private[streaming] object SparkFlumeEvent {
-  def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent = {
-    val event = new SparkFlumeEvent
-    event.event = in
-    event
-  }
-}
-
-/** A simple server that implements Flume's Avro protocol. */
-private[streaming]
-class FlumeEventServer(receiver: FlumeReceiver) extends AvroSourceProtocol {
-  override def append(event: AvroFlumeEvent): Status = {
-    receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
-    Status.OK
-  }
-
-  override def appendBatch(events: java.util.List[AvroFlumeEvent]): Status = {
-    events.asScala.foreach(event => receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
-    Status.OK
-  }
-}
-
-/** A NetworkReceiver which listens for events using the
-  * Flume Avro interface. */
-private[streaming]
-class FlumeReceiver(
-    host: String,
-    port: Int,
-    storageLevel: StorageLevel,
-    enableDecompression: Boolean
-  ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
-
-  lazy val responder = new SpecificResponder(
-    classOf[AvroSourceProtocol], new FlumeEventServer(this))
-  var server: NettyServer = null
-
-  private def initServer() = {
-    if (enableDecompression) {
-      val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
-                                                             Executors.newCachedThreadPool())
-      val channelPipelineFactory = new CompressionChannelPipelineFactory()
-
-      new NettyServer(
-        responder,
-        new InetSocketAddress(host, port),
-        channelFactory,
-        channelPipelineFactory,
-        null)
-    } else {
-      new NettyServer(responder, new InetSocketAddress(host, port))
-    }
-  }
-
-  def onStart() {
-    synchronized {
-      if (server == null) {
-        server = initServer()
-        server.start()
-      } else {
-        logWarning("Flume receiver being asked to start more then once with out close")
-      }
-    }
-    logInfo("Flume receiver started")
-  }
-
-  def onStop() {
-    synchronized {
-      if (server != null) {
-        server.close()
-        server = null
-      }
-    }
-    logInfo("Flume receiver stopped")
-  }
-
-  override def preferredLocation: Option[String] = Option(host)
-
-  /** A Netty Pipeline factory that will decompress incoming data from
-    * and the Netty client and compress data going back to the client.
-    *
-    * The compression on the return is required because Flume requires
-    * a successful response to indicate it can remove the event/batch
-    * from the configured channel
-    */
-  private[streaming]
-  class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
-    def getPipeline(): ChannelPipeline = {
-      val pipeline = Channels.pipeline()
-      val encoder = new ZlibEncoder(6)
-      pipeline.addFirst("deflater", encoder)
-      pipeline.addFirst("inflater", new ZlibDecoder())
-      pipeline
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
deleted file mode 100644
index d9c25e8..0000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume
-
-
-import java.net.InetSocketAddress
-import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit}
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder
-import org.apache.avro.ipc.NettyTransceiver
-import org.apache.avro.ipc.specific.SpecificRequestor
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.flume.sink._
-import org.apache.spark.streaming.receiver.Receiver
-
-/**
- * A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running
- * [[org.apache.spark.streaming.flume.sink.SparkSink]]s.
- * @param _ssc Streaming context that will execute this input stream
- * @param addresses List of addresses at which SparkSinks are listening
- * @param maxBatchSize Maximum size of a batch
- * @param parallelism Number of parallel connections to open
- * @param storageLevel The storage level to use.
- * @tparam T Class type of the object of this stream
- */
-private[streaming] class FlumePollingInputDStream[T: ClassTag](
-    _ssc: StreamingContext,
-    val addresses: Seq[InetSocketAddress],
-    val maxBatchSize: Int,
-    val parallelism: Int,
-    storageLevel: StorageLevel
-  ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
-
-  override def getReceiver(): Receiver[SparkFlumeEvent] = {
-    new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
-  }
-}
-
-private[streaming] class FlumePollingReceiver(
-    addresses: Seq[InetSocketAddress],
-    maxBatchSize: Int,
-    parallelism: Int,
-    storageLevel: StorageLevel
-  ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
-
-  lazy val channelFactoryExecutor =
-    Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
-      setNameFormat("Flume Receiver Channel Thread - %d").build())
-
-  lazy val channelFactory =
-    new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
-
-  lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
-    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build())
-
-  private lazy val connections = new LinkedBlockingQueue[FlumeConnection]()
-
-  override def onStart(): Unit = {
-    // Create the connections to each Flume agent.
-    addresses.foreach(host => {
-      val transceiver = new NettyTransceiver(host, channelFactory)
-      val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
-      connections.add(new FlumeConnection(transceiver, client))
-    })
-    for (i <- 0 until parallelism) {
-      logInfo("Starting Flume Polling Receiver worker threads..")
-      // Threads that pull data from Flume.
-      receiverExecutor.submit(new FlumeBatchFetcher(this))
-    }
-  }
-
-  override def onStop(): Unit = {
-    logInfo("Shutting down Flume Polling Receiver")
-    receiverExecutor.shutdown()
-    // Wait upto a minute for the threads to die
-    if (!receiverExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
-      receiverExecutor.shutdownNow()
-    }
-    connections.asScala.foreach(_.transceiver.close())
-    channelFactory.releaseExternalResources()
-  }
-
-  private[flume] def getConnections: LinkedBlockingQueue[FlumeConnection] = {
-    this.connections
-  }
-
-  private[flume] def getMaxBatchSize: Int = {
-    this.maxBatchSize
-  }
-}
-
-/**
- * A wrapper around the transceiver and the Avro IPC API.
- * @param transceiver The transceiver to use for communication with Flume
- * @param client The client that the callbacks are received on.
- */
-private[flume] class FlumeConnection(val transceiver: NettyTransceiver,
-  val client: SparkFlumeProtocol.Callback)
-
-
-

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
deleted file mode 100644
index 945cfa7..0000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.net.{InetSocketAddress, ServerSocket}
-import java.nio.ByteBuffer
-import java.nio.charset.StandardCharsets
-import java.util.{List => JList}
-import java.util.Collections
-
-import scala.collection.JavaConverters._
-
-import org.apache.avro.ipc.NettyTransceiver
-import org.apache.avro.ipc.specific.SpecificRequestor
-import org.apache.commons.lang3.RandomUtils
-import org.apache.flume.source.avro
-import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.socket.SocketChannel
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder}
-
-import org.apache.spark.util.Utils
-import org.apache.spark.SparkConf
-
-/**
- * Share codes for Scala and Python unit tests
- */
-private[flume] class FlumeTestUtils {
-
-  private var transceiver: NettyTransceiver = null
-
-  private val testPort: Int = findFreePort()
-
-  def getTestPort(): Int = testPort
-
-  /** Find a free port */
-  private def findFreePort(): Int = {
-    val candidatePort = RandomUtils.nextInt(1024, 65536)
-    Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
-      val socket = new ServerSocket(trialPort)
-      socket.close()
-      (null, trialPort)
-    }, new SparkConf())._2
-  }
-
-  /** Send data to the flume receiver */
-  def writeInput(input: JList[String], enableCompression: Boolean): Unit = {
-    val testAddress = new InetSocketAddress("localhost", testPort)
-
-    val inputEvents = input.asScala.map { item =>
-      val event = new AvroFlumeEvent
-      event.setBody(ByteBuffer.wrap(item.getBytes(StandardCharsets.UTF_8)))
-      event.setHeaders(Collections.singletonMap("test", "header"))
-      event
-    }
-
-    // if last attempted transceiver had succeeded, close it
-    close()
-
-    // Create transceiver
-    transceiver = {
-      if (enableCompression) {
-        new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
-      } else {
-        new NettyTransceiver(testAddress)
-      }
-    }
-
-    // Create Avro client with the transceiver
-    val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
-    if (client == null) {
-      throw new AssertionError("Cannot create client")
-    }
-
-    // Send data
-    val status = client.appendBatch(inputEvents.asJava)
-    if (status != avro.Status.OK) {
-      throw new AssertionError("Sent events unsuccessfully")
-    }
-  }
-
-  def close(): Unit = {
-    if (transceiver != null) {
-      transceiver.close()
-      transceiver = null
-    }
-  }
-
-  /** Class to create socket channel with compression */
-  private class CompressionChannelFactory(compressionLevel: Int)
-    extends NioClientSocketChannelFactory {
-
-    override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
-      val encoder = new ZlibEncoder(compressionLevel)
-      pipeline.addFirst("deflater", encoder)
-      pipeline.addFirst("inflater", new ZlibDecoder())
-      super.newChannel(pipeline)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
deleted file mode 100644
index 3e3ed71..0000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.io.{ByteArrayOutputStream, DataOutputStream}
-import java.net.InetSocketAddress
-import java.util.{List => JList, Map => JMap}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.api.java.function.PairFunction
-import org.apache.spark.api.python.PythonRDD
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-object FlumeUtils {
-  private val DEFAULT_POLLING_PARALLELISM = 5
-  private val DEFAULT_POLLING_BATCH_SIZE = 1000
-
-  /**
-   * Create a input stream from a Flume source.
-   * @param ssc      StreamingContext object
-   * @param hostname Hostname of the slave machine to which the flume data will be sent
-   * @param port     Port of the slave machine to which the flume data will be sent
-   * @param storageLevel  Storage level to use for storing the received objects
-   */
-  def createStream (
-      ssc: StreamingContext,
-      hostname: String,
-      port: Int,
-      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): ReceiverInputDStream[SparkFlumeEvent] = {
-    createStream(ssc, hostname, port, storageLevel, false)
-  }
-
-  /**
-   * Create a input stream from a Flume source.
-   * @param ssc      StreamingContext object
-   * @param hostname Hostname of the slave machine to which the flume data will be sent
-   * @param port     Port of the slave machine to which the flume data will be sent
-   * @param storageLevel  Storage level to use for storing the received objects
-   * @param enableDecompression  should netty server decompress input stream
-   */
-  def createStream (
-      ssc: StreamingContext,
-      hostname: String,
-      port: Int,
-      storageLevel: StorageLevel,
-      enableDecompression: Boolean
-    ): ReceiverInputDStream[SparkFlumeEvent] = {
-    val inputStream = new FlumeInputDStream[SparkFlumeEvent](
-        ssc, hostname, port, storageLevel, enableDecompression)
-
-    inputStream
-  }
-
-  /**
-   * Creates a input stream from a Flume source.
-   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
-   * @param hostname Hostname of the slave machine to which the flume data will be sent
-   * @param port     Port of the slave machine to which the flume data will be sent
-   */
-  def createStream(
-      jssc: JavaStreamingContext,
-      hostname: String,
-      port: Int
-    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-    createStream(jssc.ssc, hostname, port)
-  }
-
-  /**
-   * Creates a input stream from a Flume source.
-   * @param hostname Hostname of the slave machine to which the flume data will be sent
-   * @param port     Port of the slave machine to which the flume data will be sent
-   * @param storageLevel  Storage level to use for storing the received objects
-   */
-  def createStream(
-      jssc: JavaStreamingContext,
-      hostname: String,
-      port: Int,
-      storageLevel: StorageLevel
-    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-    createStream(jssc.ssc, hostname, port, storageLevel, false)
-  }
-
-  /**
-   * Creates a input stream from a Flume source.
-   * @param hostname Hostname of the slave machine to which the flume data will be sent
-   * @param port     Port of the slave machine to which the flume data will be sent
-   * @param storageLevel  Storage level to use for storing the received objects
-   * @param enableDecompression  should netty server decompress input stream
-   */
-  def createStream(
-      jssc: JavaStreamingContext,
-      hostname: String,
-      port: Int,
-      storageLevel: StorageLevel,
-      enableDecompression: Boolean
-    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-    createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
-  }
-
-  /**
-   * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
-   * This stream will poll the sink for data and will pull events as they are available.
-   * This stream will use a batch size of 1000 events and run 5 threads to pull data.
-   * @param hostname Address of the host on which the Spark Sink is running
-   * @param port Port of the host at which the Spark Sink is listening
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def createPollingStream(
-      ssc: StreamingContext,
-      hostname: String,
-      port: Int,
-      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): ReceiverInputDStream[SparkFlumeEvent] = {
-    createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel)
-  }
-
-  /**
-   * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
-   * This stream will poll the sink for data and will pull events as they are available.
-   * This stream will use a batch size of 1000 events and run 5 threads to pull data.
-   * @param addresses List of InetSocketAddresses representing the hosts to connect to.
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def createPollingStream(
-      ssc: StreamingContext,
-      addresses: Seq[InetSocketAddress],
-      storageLevel: StorageLevel
-    ): ReceiverInputDStream[SparkFlumeEvent] = {
-    createPollingStream(ssc, addresses, storageLevel,
-      DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
-  }
-
-  /**
-   * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
-   * This stream will poll the sink for data and will pull events as they are available.
-   * @param addresses List of InetSocketAddresses representing the hosts to connect to.
-   * @param maxBatchSize Maximum number of events to be pulled from the Spark sink in a
-   *                     single RPC call
-   * @param parallelism Number of concurrent requests this stream should send to the sink. Note
-   *                    that having a higher number of requests concurrently being pulled will
-   *                    result in this stream using more threads
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def createPollingStream(
-      ssc: StreamingContext,
-      addresses: Seq[InetSocketAddress],
-      storageLevel: StorageLevel,
-      maxBatchSize: Int,
-      parallelism: Int
-    ): ReceiverInputDStream[SparkFlumeEvent] = {
-    new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
-      parallelism, storageLevel)
-  }
-
-  /**
-   * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
-   * This stream will poll the sink for data and will pull events as they are available.
-   * This stream will use a batch size of 1000 events and run 5 threads to pull data.
-   * @param hostname Hostname of the host on which the Spark Sink is running
-   * @param port     Port of the host at which the Spark Sink is listening
-   */
-  def createPollingStream(
-      jssc: JavaStreamingContext,
-      hostname: String,
-      port: Int
-    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-    createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2)
-  }
-
-  /**
-   * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
-   * This stream will poll the sink for data and will pull events as they are available.
-   * This stream will use a batch size of 1000 events and run 5 threads to pull data.
-   * @param hostname     Hostname of the host on which the Spark Sink is running
-   * @param port         Port of the host at which the Spark Sink is listening
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def createPollingStream(
-      jssc: JavaStreamingContext,
-      hostname: String,
-      port: Int,
-      storageLevel: StorageLevel
-    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-    createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel)
-  }
-
-  /**
-   * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
-   * This stream will poll the sink for data and will pull events as they are available.
-   * This stream will use a batch size of 1000 events and run 5 threads to pull data.
-   * @param addresses    List of InetSocketAddresses on which the Spark Sink is running.
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def createPollingStream(
-      jssc: JavaStreamingContext,
-      addresses: Array[InetSocketAddress],
-      storageLevel: StorageLevel
-    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-    createPollingStream(jssc, addresses, storageLevel,
-      DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
-  }
-
-  /**
-   * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
-   * This stream will poll the sink for data and will pull events as they are available.
-   * @param addresses    List of InetSocketAddresses on which the Spark Sink is running
-   * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a
-   *                     single RPC call
-   * @param parallelism  Number of concurrent requests this stream should send to the sink. Note
-   *                     that having a higher number of requests concurrently being pulled will
-   *                     result in this stream using more threads
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def createPollingStream(
-      jssc: JavaStreamingContext,
-      addresses: Array[InetSocketAddress],
-      storageLevel: StorageLevel,
-      maxBatchSize: Int,
-      parallelism: Int
-    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-    createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
-  }
-}
-
-/**
- * This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and
- * function so that it can be easily instantiated and called from Python's FlumeUtils.
- */
-private[flume] class FlumeUtilsPythonHelper {
-
-  def createStream(
-      jssc: JavaStreamingContext,
-      hostname: String,
-      port: Int,
-      storageLevel: StorageLevel,
-      enableDecompression: Boolean
-    ): JavaPairDStream[Array[Byte], Array[Byte]] = {
-    val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression)
-    FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
-  }
-
-  def createPollingStream(
-      jssc: JavaStreamingContext,
-      hosts: JList[String],
-      ports: JList[Int],
-      storageLevel: StorageLevel,
-      maxBatchSize: Int,
-      parallelism: Int
-    ): JavaPairDStream[Array[Byte], Array[Byte]] = {
-    assert(hosts.size() == ports.size())
-    val addresses = hosts.asScala.zip(ports.asScala).map {
-      case (host, port) => new InetSocketAddress(host, port)
-    }
-    val dstream = FlumeUtils.createPollingStream(
-      jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
-    FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
-  }
-
-}
-
-private object FlumeUtilsPythonHelper {
-
-  private def stringMapToByteArray(map: JMap[CharSequence, CharSequence]): Array[Byte] = {
-    val byteStream = new ByteArrayOutputStream()
-    val output = new DataOutputStream(byteStream)
-    try {
-      output.writeInt(map.size)
-      map.asScala.foreach { kv =>
-        PythonRDD.writeUTF(kv._1.toString, output)
-        PythonRDD.writeUTF(kv._2.toString, output)
-      }
-      byteStream.toByteArray
-    }
-    finally {
-      output.close()
-    }
-  }
-
-  private def toByteArrayPairDStream(dstream: JavaReceiverInputDStream[SparkFlumeEvent]):
-    JavaPairDStream[Array[Byte], Array[Byte]] = {
-    dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] {
-      override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = {
-        val event = sparkEvent.event
-        val byteBuffer = event.getBody
-        val body = new Array[Byte](byteBuffer.remaining())
-        byteBuffer.get(body)
-        (stringMapToByteArray(event.getHeaders), body)
-      }
-    })
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
deleted file mode 100644
index 1a96df6..0000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.nio.charset.StandardCharsets
-import java.util.{Collections, List => JList, Map => JMap}
-import java.util.concurrent._
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.flume.event.EventBuilder
-import org.apache.flume.Context
-import org.apache.flume.channel.MemoryChannel
-import org.apache.flume.conf.Configurables
-
-import org.apache.spark.streaming.flume.sink.{SparkSink, SparkSinkConfig}
-
-/**
- * Share codes for Scala and Python unit tests
- */
-private[flume] class PollingFlumeTestUtils {
-
-  private val batchCount = 5
-  val eventsPerBatch = 100
-  private val totalEventsPerChannel = batchCount * eventsPerBatch
-  private val channelCapacity = 5000
-
-  def getTotalEvents: Int = totalEventsPerChannel * channels.size
-
-  private val channels = new ArrayBuffer[MemoryChannel]
-  private val sinks = new ArrayBuffer[SparkSink]
-
-  /**
-   * Start a sink and return the port of this sink
-   */
-  def startSingleSink(): Int = {
-    channels.clear()
-    sinks.clear()
-
-    // Start the channel and sink.
-    val context = new Context()
-    context.put("capacity", channelCapacity.toString)
-    context.put("transactionCapacity", "1000")
-    context.put("keep-alive", "0")
-    val channel = new MemoryChannel()
-    Configurables.configure(channel, context)
-
-    val sink = new SparkSink()
-    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
-    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
-    Configurables.configure(sink, context)
-    sink.setChannel(channel)
-    sink.start()
-
-    channels += (channel)
-    sinks += sink
-
-    sink.getPort()
-  }
-
-  /**
-   * Start 2 sinks and return the ports
-   */
-  def startMultipleSinks(): Seq[Int] = {
-    channels.clear()
-    sinks.clear()
-
-    // Start the channel and sink.
-    val context = new Context()
-    context.put("capacity", channelCapacity.toString)
-    context.put("transactionCapacity", "1000")
-    context.put("keep-alive", "0")
-    val channel = new MemoryChannel()
-    Configurables.configure(channel, context)
-
-    val channel2 = new MemoryChannel()
-    Configurables.configure(channel2, context)
-
-    val sink = new SparkSink()
-    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
-    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
-    Configurables.configure(sink, context)
-    sink.setChannel(channel)
-    sink.start()
-
-    val sink2 = new SparkSink()
-    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
-    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
-    Configurables.configure(sink2, context)
-    sink2.setChannel(channel2)
-    sink2.start()
-
-    sinks += sink
-    sinks += sink2
-    channels += channel
-    channels += channel2
-
-    sinks.map(_.getPort())
-  }
-
-  /**
-   * Send data and wait until all data has been received
-   */
-  def sendDatAndEnsureAllDataHasBeenReceived(): Unit = {
-    val executor = Executors.newCachedThreadPool()
-    val executorCompletion = new ExecutorCompletionService[Void](executor)
-
-    val latch = new CountDownLatch(batchCount * channels.size)
-    sinks.foreach(_.countdownWhenBatchReceived(latch))
-
-    channels.foreach(channel => {
-      executorCompletion.submit(new TxnSubmitter(channel))
-    })
-
-    for (i <- 0 until channels.size) {
-      executorCompletion.take()
-    }
-
-    latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
-  }
-
-  /**
-   * A Python-friendly method to assert the output
-   */
-  def assertOutput(
-      outputHeaders: JList[JMap[String, String]], outputBodies: JList[String]): Unit = {
-    require(outputHeaders.size == outputBodies.size)
-    val eventSize = outputHeaders.size
-    if (eventSize != totalEventsPerChannel * channels.size) {
-      throw new AssertionError(
-        s"Expected ${totalEventsPerChannel * channels.size} events, but was $eventSize")
-    }
-    var counter = 0
-    for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
-      val eventBodyToVerify = s"${channels(k).getName}-$i"
-      val eventHeaderToVerify: JMap[String, String] = Collections.singletonMap(s"test-$i", "header")
-      var found = false
-      var j = 0
-      while (j < eventSize && !found) {
-        if (eventBodyToVerify == outputBodies.get(j) &&
-          eventHeaderToVerify == outputHeaders.get(j)) {
-          found = true
-          counter += 1
-        }
-        j += 1
-      }
-    }
-    if (counter != totalEventsPerChannel * channels.size) {
-      throw new AssertionError(
-        s"111 Expected ${totalEventsPerChannel * channels.size} events, but was $counter")
-    }
-  }
-
-  def assertChannelsAreEmpty(): Unit = {
-    channels.foreach(assertChannelIsEmpty)
-  }
-
-  private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
-    val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
-    queueRemaining.setAccessible(true)
-    val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
-    if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) {
-      throw new AssertionError(s"Channel ${channel.getName} is not empty")
-    }
-  }
-
-  def close(): Unit = {
-    sinks.foreach(_.stop())
-    sinks.clear()
-    channels.foreach(_.stop())
-    channels.clear()
-  }
-
-  private class TxnSubmitter(channel: MemoryChannel) extends Callable[Void] {
-    override def call(): Void = {
-      var t = 0
-      for (i <- 0 until batchCount) {
-        val tx = channel.getTransaction
-        tx.begin()
-        for (j <- 0 until eventsPerBatch) {
-          channel.put(EventBuilder.withBody(
-            s"${channel.getName}-$t".getBytes(StandardCharsets.UTF_8),
-            Collections.singletonMap(s"test-$t", "header")))
-          t += 1
-        }
-        tx.commit()
-        tx.close()
-        Thread.sleep(500) // Allow some time for the events to reach
-      }
-      null
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
deleted file mode 100644
index d31aa5f..0000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Spark streaming receiver for Flume.
- */
-package org.apache.spark.streaming.flume;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
deleted file mode 100644
index 9bfab68..0000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-/**
- * Spark streaming receiver for Flume.
- */
-package object flume

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a..0000000
--- a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
-    protected transient JavaStreamingContext ssc;
-
-    @Before
-    public void setUp() {
-        SparkConf conf = new SparkConf()
-            .setMaster("local[2]")
-            .setAppName("test")
-            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
-        ssc = new JavaStreamingContext(conf, new Duration(1000));
-        ssc.checkpoint("checkpoint");
-    }
-
-    @After
-    public void tearDown() {
-        ssc.stop();
-        ssc = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
deleted file mode 100644
index 79c5b91..0000000
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume;
-
-import java.net.InetSocketAddress;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
-
-public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext {
-  @Test
-  public void testFlumeStream() {
-    // tests the API, does not actually test data receiving
-    InetSocketAddress[] addresses = new InetSocketAddress[] {
-        new InetSocketAddress("localhost", 12345)
-    };
-    JavaReceiverInputDStream<SparkFlumeEvent> test1 =
-        FlumeUtils.createPollingStream(ssc, "localhost", 12345);
-    JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createPollingStream(
-        ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2());
-    JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createPollingStream(
-        ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2());
-    JavaReceiverInputDStream<SparkFlumeEvent> test4 = FlumeUtils.createPollingStream(
-        ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
----------------------------------------------------------------------
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
deleted file mode 100644
index 3b5e0c7..0000000
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
-
-public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
-  @Test
-  public void testFlumeStream() {
-    // tests the API, does not actually test data receiving
-    JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
-    JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
-      StorageLevel.MEMORY_AND_DISK_SER_2());
-    JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
-      StorageLevel.MEMORY_AND_DISK_SER_2(), false);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties
deleted file mode 100644
index 75e3b53..0000000
--- a/external/flume/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
deleted file mode 100644
index c97a27c..0000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import java.io.{IOException, ObjectInputStream}
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
-import org.apache.spark.util.Utils
-
-/**
- * This is a output stream just for the testsuites. All the output is collected into a
- * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
- *
- * The buffer contains a sequence of RDD's, each containing a sequence of items
- */
-class TestOutputStream[T: ClassTag](parent: DStream[T],
-    val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]())
-  extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
-    val collected = rdd.collect()
-    output.add(collected)
-  }, false) {
-
-  // This is to clear the output buffer every it is read from a checkpoint
-  @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
-    ois.defaultReadObject()
-    output.clear()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
deleted file mode 100644
index 10dcbf9..0000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.net.InetSocketAddress
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext, TestOutputStream}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.util.{ManualClock, Utils}
-
-class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging {
-
-  val maxAttempts = 5
-  val batchDuration = Seconds(1)
-
-  val conf = new SparkConf()
-    .setMaster("local[2]")
-    .setAppName(this.getClass.getSimpleName)
-    .set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
-
-  val utils = new PollingFlumeTestUtils
-
-  test("flume polling test") {
-    testMultipleTimes(testFlumePolling)
-  }
-
-  test("flume polling test multiple hosts") {
-    testMultipleTimes(testFlumePollingMultipleHost)
-  }
-
-  /**
-   * Run the given test until no more java.net.BindException's are thrown.
-   * Do this only up to a certain attempt limit.
-   */
-  private def testMultipleTimes(test: () => Unit): Unit = {
-    var testPassed = false
-    var attempt = 0
-    while (!testPassed && attempt < maxAttempts) {
-      try {
-        test()
-        testPassed = true
-      } catch {
-        case e: Exception if Utils.isBindCollision(e) =>
-          logWarning("Exception when running flume polling test: " + e)
-          attempt += 1
-      }
-    }
-    assert(testPassed, s"Test failed after $attempt attempts!")
-  }
-
-  private def testFlumePolling(): Unit = {
-    try {
-      val port = utils.startSingleSink()
-
-      writeAndVerify(Seq(port))
-      utils.assertChannelsAreEmpty()
-    } finally {
-      utils.close()
-    }
-  }
-
-  private def testFlumePollingMultipleHost(): Unit = {
-    try {
-      val ports = utils.startMultipleSinks()
-      writeAndVerify(ports)
-      utils.assertChannelsAreEmpty()
-    } finally {
-      utils.close()
-    }
-  }
-
-  def writeAndVerify(sinkPorts: Seq[Int]): Unit = {
-    // Set up the streaming context and input streams
-    val ssc = new StreamingContext(conf, batchDuration)
-    val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port))
-    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
-      FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
-        utils.eventsPerBatch, 5)
-    val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
-    val outputStream = new TestOutputStream(flumeStream, outputQueue)
-    outputStream.register()
-
-    ssc.start()
-    try {
-      utils.sendDatAndEnsureAllDataHasBeenReceived()
-      val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-      clock.advance(batchDuration.milliseconds)
-
-      // The eventually is required to ensure that all data in the batch has been processed.
-      eventually(timeout(10 seconds), interval(100 milliseconds)) {
-        val flattenOutput = outputQueue.asScala.toSeq.flatten
-        val headers = flattenOutput.map(_.event.getHeaders.asScala.map {
-          case (key, value) => (key.toString, value.toString)
-        }).map(_.asJava)
-        val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody))
-        utils.assertOutput(headers.asJava, bodies.asJava)
-      }
-    } finally {
-      ssc.stop()
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
deleted file mode 100644
index 38208c6..0000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.socket.SocketChannel
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.jboss.netty.handler.codec.compression._
-import org.scalatest.{BeforeAndAfter, Matchers}
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
-
-class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
-  val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
-  var ssc: StreamingContext = null
-
-  test("flume input stream") {
-    testFlumeStream(testCompression = false)
-  }
-
-  test("flume input compressed stream") {
-    testFlumeStream(testCompression = true)
-  }
-
-  /** Run test on flume stream */
-  private def testFlumeStream(testCompression: Boolean): Unit = {
-    val input = (1 to 100).map { _.toString }
-    val utils = new FlumeTestUtils
-    try {
-      val outputQueue = startContext(utils.getTestPort(), testCompression)
-
-      eventually(timeout(10 seconds), interval(100 milliseconds)) {
-        utils.writeInput(input.asJava, testCompression)
-      }
-
-      eventually(timeout(10 seconds), interval(100 milliseconds)) {
-        val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event }
-        outputEvents.foreach {
-          event =>
-            event.getHeaders.get("test") should be("header")
-        }
-        val output = outputEvents.map(event => JavaUtils.bytesToString(event.getBody))
-        output should be (input)
-      }
-    } finally {
-      if (ssc != null) {
-        ssc.stop()
-      }
-      utils.close()
-    }
-  }
-
-  /** Setup and start the streaming context */
-  private def startContext(
-      testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = {
-    ssc = new StreamingContext(conf, Milliseconds(200))
-    val flumeStream = FlumeUtils.createStream(
-      ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
-    val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
-    val outputStream = new TestOutputStream(flumeStream, outputQueue)
-    outputStream.register()
-    ssc.start()
-    outputQueue
-  }
-
-  /** Class to create socket channel with compression */
-  private class CompressionChannelFactory(compressionLevel: Int)
-    extends NioClientSocketChannelFactory {
-
-    override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
-      val encoder = new ZlibEncoder(compressionLevel)
-      pipeline.addFirst("deflater", encoder)
-      pipeline.addFirst("inflater", new ZlibDecoder())
-      super.newChannel(pipeline)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/mqtt-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml
deleted file mode 100644
index ac2a3f6..0000000
--- a/external/mqtt-assembly/pom.xml
+++ /dev/null
@@ -1,175 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent_2.11</artifactId>
-    <version>2.0.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <groupId>org.apache.spark</groupId>
-  <artifactId>spark-streaming-mqtt-assembly_2.11</artifactId>
-  <packaging>jar</packaging>
-  <name>Spark Project External MQTT Assembly</name>
-  <url>http://spark.apache.org/</url>
-
-  <properties>
-    <sbt.project.name>streaming-mqtt-assembly</sbt.project.name>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <!--
-      Demote already included in the Spark assembly.
-    -->
-    <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.sun.jersey</groupId>
-      <artifactId>jersey-server</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.sun.jersey</groupId>
-      <artifactId>jersey-core</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-mapred</artifactId>
-      <classifier>${avro.mapred.classifier}</classifier>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.curator</groupId>
-      <artifactId>curator-recipes</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>net.java.dev.jets3t</groupId>
-      <artifactId>jets3t</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.xerial.snappy</groupId>
-      <artifactId>snappy-java</artifactId>
-      <scope>provided</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
-    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <configuration>
-          <shadedArtifactAttached>false</shadedArtifactAttached>
-          <artifactSet>
-            <includes>
-              <include>*:*</include>
-            </includes>
-          </artifactSet>
-          <filters>
-            <filter>
-              <artifact>*:*</artifact>
-              <excludes>
-                <exclude>META-INF/*.SF</exclude>
-                <exclude>META-INF/*.DSA</exclude>
-                <exclude>META-INF/*.RSA</exclude>
-              </excludes>
-            </filter>
-          </filters>
-        </configuration>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>shade</goal>
-            </goals>
-            <configuration>
-              <transformers>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
-                  <resource>reference.conf</resource>
-                </transformer>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
-                  <resource>log4j.properties</resource>
-                </transformer>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
-                <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
-              </transformers>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
deleted file mode 100644
index d0d9687..0000000
--- a/external/mqtt/pom.xml
+++ /dev/null
@@ -1,104 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent_2.11</artifactId>
-    <version>2.0.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <groupId>org.apache.spark</groupId>
-  <artifactId>spark-streaming-mqtt_2.11</artifactId>
-  <properties>
-    <sbt.project.name>streaming-mqtt</sbt.project.name>
-  </properties>
-  <packaging>jar</packaging>
-  <name>Spark Project External MQTT</name>
-  <url>http://spark.apache.org/</url>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.paho</groupId>
-      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
-      <version>1.0.2</version>
-    </dependency>
-    <dependency>
-      <groupId>org.scalacheck</groupId>
-      <artifactId>scalacheck_${scala.binary.version}</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-core</artifactId>
-      <version>5.7.0</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
-    </dependency>
-  </dependencies>
-  <build>
-    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
-    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-
-    <plugins>
-      <!-- Assemble a jar with test dependencies for Python tests -->
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>test-jar-with-dependencies</id>
-            <phase>package</phase>
-            <goals>
-              <goal>single</goal>
-            </goals>
-            <configuration>
-              <!-- Make sure the file path is same as the sbt build -->
-              <finalName>spark-streaming-mqtt-test-${project.version}</finalName>
-              <outputDirectory>${project.build.directory}/scala-${scala.binary.version}/</outputDirectory>
-              <appendAssemblyId>false</appendAssemblyId>
-              <!-- Don't publish it since it's only for Python tests -->
-              <attach>false</attach>
-              <descriptors>
-                <descriptor>src/main/assembly/assembly.xml</descriptor>
-              </descriptors>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/mqtt/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/assembly/assembly.xml b/external/mqtt/src/main/assembly/assembly.xml
deleted file mode 100644
index c110b01..0000000
--- a/external/mqtt/src/main/assembly/assembly.xml
+++ /dev/null
@@ -1,44 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<assembly>
-  <id>test-jar-with-dependencies</id>
-  <formats>
-    <format>jar</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-
-  <fileSets>
-    <fileSet>
-      <directory>${project.build.directory}/scala-${scala.binary.version}/test-classes</directory>
-      <outputDirectory></outputDirectory>
-    </fileSet>
-  </fileSets>
-
-  <dependencySets>
-    <dependencySet>
-      <useTransitiveDependencies>true</useTransitiveDependencies>
-      <scope>test</scope>
-      <unpack>true</unpack>
-      <excludes>
-        <exclude>org.apache.hadoop:*:jar</exclude>
-        <exclude>org.apache.zookeeper:*:jar</exclude>
-        <exclude>org.apache.avro:*:jar</exclude>
-      </excludes>
-    </dependencySet>
-  </dependencySets>
-
-</assembly>

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
deleted file mode 100644
index cbad6f7..0000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.mqtt
-
-import java.nio.charset.StandardCharsets
-
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
-import org.eclipse.paho.client.mqttv3.MqttCallback
-import org.eclipse.paho.client.mqttv3.MqttClient
-import org.eclipse.paho.client.mqttv3.MqttMessage
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.Receiver
-
-/**
- * Input stream that subscribe messages from a Mqtt Broker.
- * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
- * @param brokerUrl Url of remote mqtt publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level.
- */
-
-private[streaming]
-class MQTTInputDStream(
-    _ssc: StreamingContext,
-    brokerUrl: String,
-    topic: String,
-    storageLevel: StorageLevel
-  ) extends ReceiverInputDStream[String](_ssc) {
-
-  private[streaming] override def name: String = s"MQTT stream [$id]"
-
-  def getReceiver(): Receiver[String] = {
-    new MQTTReceiver(brokerUrl, topic, storageLevel)
-  }
-}
-
-private[streaming]
-class MQTTReceiver(
-    brokerUrl: String,
-    topic: String,
-    storageLevel: StorageLevel
-  ) extends Receiver[String](storageLevel) {
-
-  def onStop() {
-
-  }
-
-  def onStart() {
-
-    // Set up persistence for messages
-    val persistence = new MemoryPersistence()
-
-    // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
-    val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
-
-    // Callback automatically triggers as and when new message arrives on specified topic
-    val callback = new MqttCallback() {
-
-      // Handles Mqtt message
-      override def messageArrived(topic: String, message: MqttMessage) {
-        store(new String(message.getPayload(), StandardCharsets.UTF_8))
-      }
-
-      override def deliveryComplete(token: IMqttDeliveryToken) {
-      }
-
-      override def connectionLost(cause: Throwable) {
-        restart("Connection lost ", cause)
-      }
-    }
-
-    // Set up callback for MqttClient. This needs to happen before
-    // connecting or subscribing, otherwise messages may be lost
-    client.setCallback(callback)
-
-    // Connect to MqttBroker
-    client.connect()
-
-    // Subscribe to Mqtt topic
-    client.subscribe(topic)
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
deleted file mode 100644
index 7b8d56d..0000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.mqtt
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-object MQTTUtils {
-  /**
-   * Create an input stream that receives messages pushed by a MQTT publisher.
-   * @param ssc           StreamingContext object
-   * @param brokerUrl     Url of remote MQTT publisher
-   * @param topic         Topic name to subscribe to
-   * @param storageLevel  RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
-   */
-  def createStream(
-      ssc: StreamingContext,
-      brokerUrl: String,
-      topic: String,
-      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): ReceiverInputDStream[String] = {
-    new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
-  }
-
-  /**
-   * Create an input stream that receives messages pushed by a MQTT publisher.
-   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
-   * @param jssc      JavaStreamingContext object
-   * @param brokerUrl Url of remote MQTT publisher
-   * @param topic     Topic name to subscribe to
-   */
-  def createStream(
-      jssc: JavaStreamingContext,
-      brokerUrl: String,
-      topic: String
-    ): JavaReceiverInputDStream[String] = {
-    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
-    createStream(jssc.ssc, brokerUrl, topic)
-  }
-
-  /**
-   * Create an input stream that receives messages pushed by a MQTT publisher.
-   * @param jssc      JavaStreamingContext object
-   * @param brokerUrl     Url of remote MQTT publisher
-   * @param topic         Topic name to subscribe to
-   * @param storageLevel  RDD storage level.
-   */
-  def createStream(
-      jssc: JavaStreamingContext,
-      brokerUrl: String,
-      topic: String,
-      storageLevel: StorageLevel
-    ): JavaReceiverInputDStream[String] = {
-    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
-    createStream(jssc.ssc, brokerUrl, topic, storageLevel)
-  }
-}
-
-/**
- * This is a helper class that wraps the methods in MQTTUtils into more Python-friendly class and
- * function so that it can be easily instantiated and called from Python's MQTTUtils.
- */
-private[mqtt] class MQTTUtilsPythonHelper {
-
-  def createStream(
-      jssc: JavaStreamingContext,
-      brokerUrl: String,
-      topic: String,
-      storageLevel: StorageLevel
-    ): JavaDStream[String] = {
-    MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package-info.java
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package-info.java b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package-info.java
deleted file mode 100644
index 728e0d8..0000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * MQTT receiver for Spark Streaming.
- */
-package org.apache.spark.streaming.mqtt;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
----------------------------------------------------------------------
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
deleted file mode 100644
index 63d0d13..0000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-/**
- * MQTT receiver for Spark Streaming.
- */
-package object mqtt

http://git-wip-us.apache.org/repos/asf/spark/blob/06dec374/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a..0000000
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
-    protected transient JavaStreamingContext ssc;
-
-    @Before
-    public void setUp() {
-        SparkConf conf = new SparkConf()
-            .setMaster("local[2]")
-            .setAppName("test")
-            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
-        ssc = new JavaStreamingContext(conf, new Duration(1000));
-        ssc.checkpoint("checkpoint");
-    }
-
-    @After
-    public void tearDown() {
-        ssc.stop();
-        ssc = null;
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message