spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject [1/3] spark git commit: [SPARK-25598][STREAMING][BUILD][TEST-MAVEN] Remove flume connector in Spark 3
Date Thu, 11 Oct 2018 21:28:09 GMT
Repository: spark
Updated Branches:
  refs/heads/master 69f5e9cce -> a00181418


http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
deleted file mode 100644
index 9241b13..0000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.net.InetSocketAddress
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-import org.apache.spark.internal.Logging
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext, TestOutputStream}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.util.{ManualClock, Utils}
-
-class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
-
-  val maxAttempts = 5
-  val batchDuration = Seconds(1)
-
-  @transient private var _sc: SparkContext = _
-
-  val conf = new SparkConf()
-    .setMaster("local[2]")
-    .setAppName(this.getClass.getSimpleName)
-    .set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
-
-  val utils = new PollingFlumeTestUtils
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    _sc = new SparkContext(conf)
-  }
-
-  override def afterAll(): Unit = {
-    try {
-      if (_sc != null) {
-        _sc.stop()
-        _sc = null
-      }
-    } finally {
-      super.afterAll()
-    }
-  }
-
-  test("flume polling test") {
-    testMultipleTimes(() => testFlumePolling())
-  }
-
-  test("flume polling test multiple hosts") {
-    testMultipleTimes(() => testFlumePollingMultipleHost())
-  }
-
-  /**
-   * Run the given test until no more java.net.BindException's are thrown.
-   * Do this only up to a certain attempt limit.
-   */
-  private def testMultipleTimes(test: () => Unit): Unit = {
-    var testPassed = false
-    var attempt = 0
-    while (!testPassed && attempt < maxAttempts) {
-      try {
-        test()
-        testPassed = true
-      } catch {
-        case e: Exception if Utils.isBindCollision(e) =>
-          logWarning("Exception when running flume polling test: " + e)
-          attempt += 1
-      }
-    }
-    assert(testPassed, s"Test failed after $attempt attempts!")
-  }
-
-  private def testFlumePolling(): Unit = {
-    try {
-      val port = utils.startSingleSink()
-
-      writeAndVerify(Seq(port))
-      utils.assertChannelsAreEmpty()
-    } finally {
-      utils.close()
-    }
-  }
-
-  private def testFlumePollingMultipleHost(): Unit = {
-    try {
-      val ports = utils.startMultipleSinks()
-      writeAndVerify(ports)
-      utils.assertChannelsAreEmpty()
-    } finally {
-      utils.close()
-    }
-  }
-
-  def writeAndVerify(sinkPorts: Seq[Int]): Unit = {
-    // Set up the streaming context and input streams
-    val ssc = new StreamingContext(_sc, batchDuration)
-    val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port))
-    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
-      FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
-        utils.eventsPerBatch, 5)
-    val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
-    val outputStream = new TestOutputStream(flumeStream, outputQueue)
-    outputStream.register()
-
-    ssc.start()
-    try {
-      utils.sendDataAndEnsureAllDataHasBeenReceived()
-      val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-      clock.advance(batchDuration.milliseconds)
-
-      // The eventually is required to ensure that all data in the batch has been processed.
-      eventually(timeout(10 seconds), interval(100 milliseconds)) {
-        val flattenOutput = outputQueue.asScala.toSeq.flatten
-        val headers = flattenOutput.map(_.event.getHeaders.asScala.map {
-          case (key, value) => (key.toString, value.toString)
-        }).map(_.asJava)
-        val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody))
-        utils.assertOutput(headers.asJava, bodies.asJava)
-      }
-    } finally {
-      // here stop ssc only, but not underlying sparkcontext
-      ssc.stop(false)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
deleted file mode 100644
index 7bac1cc..0000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.socket.SocketChannel
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.jboss.netty.handler.codec.compression._
-import org.scalatest.{BeforeAndAfter, Matchers}
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.internal.Logging
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
-
-class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging
{
-  val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
-  var ssc: StreamingContext = null
-
-  test("flume input stream") {
-    testFlumeStream(testCompression = false)
-  }
-
-  test("flume input compressed stream") {
-    testFlumeStream(testCompression = true)
-  }
-
-  /** Run test on flume stream */
-  private def testFlumeStream(testCompression: Boolean): Unit = {
-    val input = (1 to 100).map { _.toString }
-    val utils = new FlumeTestUtils
-    try {
-      val outputQueue = startContext(utils.getTestPort(), testCompression)
-
-      eventually(timeout(10 seconds), interval(100 milliseconds)) {
-        utils.writeInput(input.asJava, testCompression)
-      }
-
-      eventually(timeout(10 seconds), interval(100 milliseconds)) {
-        val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event }
-        outputEvents.foreach {
-          event =>
-            event.getHeaders.get("test") should be("header")
-        }
-        val output = outputEvents.map(event => JavaUtils.bytesToString(event.getBody))
-        output should be (input)
-      }
-    } finally {
-      if (ssc != null) {
-        ssc.stop()
-      }
-      utils.close()
-    }
-  }
-
-  /** Setup and start the streaming context */
-  private def startContext(
-      testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]])
= {
-    ssc = new StreamingContext(conf, Milliseconds(200))
-    val flumeStream = FlumeUtils.createStream(
-      ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
-    val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
-    val outputStream = new TestOutputStream(flumeStream, outputQueue)
-    outputStream.register()
-    ssc.start()
-    outputQueue
-  }
-
-  /** Class to create socket channel with compression */
-  private class CompressionChannelFactory(compressionLevel: Int)
-    extends NioClientSocketChannelFactory {
-
-    override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
-      val encoder = new ZlibEncoder(compressionLevel)
-      pipeline.addFirst("deflater", encoder)
-      pipeline.addFirst("inflater", new ZlibDecoder())
-      super.newChannel(pipeline)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 0acc9b8..ba4009e 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -108,7 +108,6 @@ private[spark] class DirectKafkaInputDStream[K, V](
     }
   }
 
-  // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
   private[streaming] override def name: String = s"Kafka 0.10 direct stream [$id]"
 
   protected[streaming] override val checkpointData =

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 9297c39..2ec771e 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -67,7 +67,6 @@ class DirectKafkaInputDStream[
   val maxRetries = context.sparkContext.getConf.getInt(
     "spark.streaming.kafka.maxRetries", 1)
 
-  // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
   private[streaming] override def name: String = s"Kafka direct stream [$id]"
 
   protected[streaming] override val checkpointData =

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 59ba317..7ce7c9f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,7 +121,6 @@
     <hadoop.version>2.7.3</hadoop.version>
     <protobuf.version>2.5.0</protobuf.version>
     <yarn.version>${hadoop.version}</yarn.version>
-    <flume.version>1.6.0</flume.version>
     <zookeeper.version>3.4.6</zookeeper.version>
     <curator.version>2.7.1</curator.version>
     <hive.group>org.spark-project.hive</hive.group>
@@ -212,7 +211,6 @@
       during compilation if the dependency is transivite (e.g. "graphx/" depending on "core/"
and
       needing Hadoop classes in the classpath to compile).
     -->
-    <flume.deps.scope>compile</flume.deps.scope>
     <hadoop.deps.scope>compile</hadoop.deps.scope>
     <hive.deps.scope>compile</hive.deps.scope>
     <orc.deps.scope>compile</orc.deps.scope>
@@ -1806,46 +1804,6 @@
         <scope>compile</scope>
       </dependency>
       <dependency>
-        <groupId>org.apache.flume</groupId>
-        <artifactId>flume-ng-core</artifactId>
-        <version>${flume.version}</version>
-        <scope>${flume.deps.scope}</scope>
-        <exclusions>
-          <exclusion>
-            <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.apache.flume</groupId>
-            <artifactId>flume-ng-auth</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.apache.thrift</groupId>
-            <artifactId>libthrift</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.mortbay.jetty</groupId>
-            <artifactId>servlet-api</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.flume</groupId>
-        <artifactId>flume-ng-sdk</artifactId>
-        <version>${flume.version}</version>
-        <scope>${flume.deps.scope}</scope>
-        <exclusions>
-          <exclusion>
-            <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.apache.thrift</groupId>
-            <artifactId>libthrift</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
         <groupId>org.apache.calcite</groupId>
         <artifactId>calcite-core</artifactId>
         <version>${calcite.version}</version>
@@ -2635,15 +2593,6 @@
       </dependencies>
     </profile>
 
-    <profile>
-      <id>flume</id>
-      <modules>
-        <module>external/flume</module>
-        <module>external/flume-sink</module>
-        <module>external/flume-assembly</module>
-      </modules>
-    </profile>
-
     <!-- Ganglia integration is not included by default due to LGPL-licensed code -->
     <profile>
       <id>spark-ganglia-lgpl</id>
@@ -2836,9 +2785,6 @@
       that does not have them.
     -->
     <profile>
-      <id>flume-provided</id>
-    </profile>
-    <profile>
       <id>hadoop-provided</id>
     </profile>
     <profile>

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index a5ed908..8b01b90 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -55,16 +55,14 @@ object BuildCommons {
   ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
 
   val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn,
-    streamingFlumeSink, streamingFlume,
     streamingKafka, sparkGangliaLgpl, streamingKinesisAsl,
     dockerIntegrationTests, hadoopCloud, kubernetesIntegrationTests) =
     Seq("kubernetes", "mesos", "yarn",
-      "streaming-flume-sink", "streaming-flume",
       "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
       "docker-integration-tests", "hadoop-cloud", "kubernetes-integration-tests").map(ProjectRef(buildLocation,
_))
 
-  val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly,
streamingKinesisAslAssembly) =
-    Seq("network-yarn", "streaming-flume-assembly", "streaming-kafka-0-8-assembly", "streaming-kafka-0-10-assembly",
"streaming-kinesis-asl-assembly")
+  val assemblyProjects@Seq(networkYarn, streamingKafkaAssembly, streamingKafka010Assembly,
streamingKinesisAslAssembly) =
+    Seq("network-yarn", "streaming-kafka-0-8-assembly", "streaming-kafka-0-10-assembly",
"streaming-kinesis-asl-assembly")
       .map(ProjectRef(buildLocation, _))
 
   val copyJarsProjects@Seq(assembly, examples) = Seq("assembly", "examples")
@@ -373,8 +371,6 @@ object SparkBuild extends PomBuild {
   /* Hive console settings */
   enable(Hive.settings)(hive)
 
-  enable(Flume.settings)(streamingFlumeSink)
-
   // SPARK-14738 - Remove docker tests from main Spark build
   // enable(DockerIntegrationTests.settings)(dockerIntegrationTests)
 
@@ -452,9 +448,6 @@ object Unsafe {
   )
 }
 
-object Flume {
-  lazy val settings = sbtavro.SbtAvro.avroSettings
-}
 
 object DockerIntegrationTests {
   // This serves to override the override specified in DependencyOverrides:
@@ -587,8 +580,7 @@ object Assembly {
         .getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String])
     },
     jarName in assembly := {
-      if (moduleName.value.contains("streaming-flume-assembly")
-        || moduleName.value.contains("streaming-kafka-0-8-assembly")
+      if (moduleName.value.contains("streaming-kafka-0-8-assembly")
         || moduleName.value.contains("streaming-kafka-0-10-assembly")
         || moduleName.value.contains("streaming-kinesis-asl-assembly")) {
         // This must match the same name used in maven (see external/kafka-0-8-assembly/pom.xml)
@@ -694,10 +686,10 @@ object Unidoc {
     publish := {},
 
     unidocProjectFilter in(ScalaUnidoc, unidoc) :=
-      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink,
kubernetes,
+      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes,
         yarn, tags, streamingKafka010, sqlKafka010, avro),
     unidocProjectFilter in(JavaUnidoc, unidoc) :=
-      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink,
kubernetes,
+      inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes,
         yarn, tags, streamingKafka010, sqlKafka010, avro),
 
     unidocAllClasspaths in (ScalaUnidoc, unidoc) := {

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/python/docs/pyspark.streaming.rst
----------------------------------------------------------------------
diff --git a/python/docs/pyspark.streaming.rst b/python/docs/pyspark.streaming.rst
index 25ceaba..9c25628 100644
--- a/python/docs/pyspark.streaming.rst
+++ b/python/docs/pyspark.streaming.rst
@@ -22,10 +22,3 @@ pyspark.streaming.kinesis module
     :members:
     :undoc-members:
     :show-inheritance:
-
-pyspark.streaming.flume.module
-------------------------------
-.. automodule:: pyspark.streaming.flume
-    :members:
-    :undoc-members:
-    :show-inheritance:

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/python/pyspark/streaming/dstream.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index ce42a85..946601e 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -45,7 +45,7 @@ class DStream(object):
     for more details on RDDs).
 
     DStreams can either be created from live data (such as, data from TCP
-    sockets, Kafka, Flume, etc.) using a L{StreamingContext} or it can be
+    sockets, Kafka, etc.) using a L{StreamingContext} or it can be
     generated by transforming existing DStreams using operations such as
     `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming
     program is running, each DStream periodically generates a RDD, either

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/python/pyspark/streaming/flume.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py
deleted file mode 100644
index 5de4481..0000000
--- a/python/pyspark/streaming/flume.py
+++ /dev/null
@@ -1,156 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import sys
-if sys.version >= "3":
-    from io import BytesIO
-else:
-    from StringIO import StringIO
-import warnings
-
-from py4j.protocol import Py4JJavaError
-
-from pyspark.storagelevel import StorageLevel
-from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int
-from pyspark.streaming import DStream
-
-__all__ = ['FlumeUtils', 'utf8_decoder']
-
-
-def utf8_decoder(s):
-    """ Decode the unicode as UTF-8 """
-    if s is None:
-        return None
-    return s.decode('utf-8')
-
-
-class FlumeUtils(object):
-
-    @staticmethod
-    def createStream(ssc, hostname, port,
-                     storageLevel=StorageLevel.MEMORY_AND_DISK_2,
-                     enableDecompression=False,
-                     bodyDecoder=utf8_decoder):
-        """
-        Create an input stream that pulls events from Flume.
-
-        :param ssc:  StreamingContext object
-        :param hostname:  Hostname of the slave machine to which the flume data will be sent
-        :param port:  Port of the slave machine to which the flume data will be sent
-        :param storageLevel:  Storage level to use for storing the received objects
-        :param enableDecompression:  Should netty server decompress input stream
-        :param bodyDecoder:  A function used to decode body (default is utf8_decoder)
-        :return: A DStream object
-
-        .. note:: Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0.
-            See SPARK-22142.
-        """
-        warnings.warn(
-            "Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0. "
-            "See SPARK-22142.",
-            DeprecationWarning)
-        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
-        helper = FlumeUtils._get_helper(ssc._sc)
-        jstream = helper.createStream(ssc._jssc, hostname, port, jlevel, enableDecompression)
-        return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
-
-    @staticmethod
-    def createPollingStream(ssc, addresses,
-                            storageLevel=StorageLevel.MEMORY_AND_DISK_2,
-                            maxBatchSize=1000,
-                            parallelism=5,
-                            bodyDecoder=utf8_decoder):
-        """
-        Creates an input stream that is to be used with the Spark Sink deployed on a Flume
agent.
-        This stream will poll the sink for data and will pull events as they are available.
-
-        :param ssc:  StreamingContext object
-        :param addresses:  List of (host, port)s on which the Spark Sink is running.
-        :param storageLevel:  Storage level to use for storing the received objects
-        :param maxBatchSize:  The maximum number of events to be pulled from the Spark sink
-                              in a single RPC call
-        :param parallelism:  Number of concurrent requests this stream should send to the
sink.
-                             Note that having a higher number of requests concurrently being
pulled
-                             will result in this stream using more threads
-        :param bodyDecoder:  A function used to decode body (default is utf8_decoder)
-        :return: A DStream object
-
-        .. note:: Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0.
-            See SPARK-22142.
-        """
-        warnings.warn(
-            "Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0. "
-            "See SPARK-22142.",
-            DeprecationWarning)
-        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
-        hosts = []
-        ports = []
-        for (host, port) in addresses:
-            hosts.append(host)
-            ports.append(port)
-        helper = FlumeUtils._get_helper(ssc._sc)
-        jstream = helper.createPollingStream(
-            ssc._jssc, hosts, ports, jlevel, maxBatchSize, parallelism)
-        return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
-
-    @staticmethod
-    def _toPythonDStream(ssc, jstream, bodyDecoder):
-        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
-        stream = DStream(jstream, ssc, ser)
-
-        def func(event):
-            headersBytes = BytesIO(event[0]) if sys.version >= "3" else StringIO(event[0])
-            headers = {}
-            strSer = UTF8Deserializer()
-            for i in range(0, read_int(headersBytes)):
-                key = strSer.loads(headersBytes)
-                value = strSer.loads(headersBytes)
-                headers[key] = value
-            body = bodyDecoder(event[1])
-            return (headers, body)
-        return stream.map(func)
-
-    @staticmethod
-    def _get_helper(sc):
-        try:
-            return sc._jvm.org.apache.spark.streaming.flume.FlumeUtilsPythonHelper()
-        except TypeError as e:
-            if str(e) == "'JavaPackage' object is not callable":
-                FlumeUtils._printErrorMsg(sc)
-            raise
-
-    @staticmethod
-    def _printErrorMsg(sc):
-        print("""
-________________________________________________________________________________________________
-
-  Spark Streaming's Flume libraries not found in class path. Try one of the following.
-
-  1. Include the Flume library and its dependencies with in the
-     spark-submit command as
-
-     $ bin/spark-submit --packages org.apache.spark:spark-streaming-flume:%s ...
-
-  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
-     Group Id = org.apache.spark, Artifact Id = spark-streaming-flume-assembly, Version =
%s.
-     Then, include the jar in the spark-submit command as
-
-     $ bin/spark-submit --jars <spark-streaming-flume-assembly.jar> ...
-
-________________________________________________________________________________________________
-
-""" % (sc.version, sc.version))

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 5cef621..4b995c0 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -48,7 +48,6 @@ from pyspark.context import SparkConf, SparkContext, RDD
 from pyspark.storagelevel import StorageLevel
 from pyspark.streaming.context import StreamingContext
 from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
-from pyspark.streaming.flume import FlumeUtils
 from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
 from pyspark.streaming.listener import StreamingListener
 
@@ -1301,148 +1300,6 @@ class KafkaStreamTests(PySparkStreamingTestCase):
         self._validateStreamResult({"aa": 1, "bb": 2, "cc": 3}, stream)
 
 
-class FlumeStreamTests(PySparkStreamingTestCase):
-    timeout = 20  # seconds
-    duration = 1
-
-    def setUp(self):
-        super(FlumeStreamTests, self).setUp()
-        self._utils = self.ssc._jvm.org.apache.spark.streaming.flume.FlumeTestUtils()
-
-    def tearDown(self):
-        if self._utils is not None:
-            self._utils.close()
-            self._utils = None
-
-        super(FlumeStreamTests, self).tearDown()
-
-    def _startContext(self, n, compressed):
-        # Start the StreamingContext and also collect the result
-        dstream = FlumeUtils.createStream(self.ssc, "localhost", self._utils.getTestPort(),
-                                          enableDecompression=compressed)
-        result = []
-
-        def get_output(_, rdd):
-            for event in rdd.collect():
-                if len(result) < n:
-                    result.append(event)
-        dstream.foreachRDD(get_output)
-        self.ssc.start()
-        return result
-
-    def _validateResult(self, input, result):
-        # Validate both the header and the body
-        header = {"test": "header"}
-        self.assertEqual(len(input), len(result))
-        for i in range(0, len(input)):
-            self.assertEqual(header, result[i][0])
-            self.assertEqual(input[i], result[i][1])
-
-    def _writeInput(self, input, compressed):
-        # Try to write input to the receiver until success or timeout
-        start_time = time.time()
-        while True:
-            try:
-                self._utils.writeInput(input, compressed)
-                break
-            except:
-                if time.time() - start_time < self.timeout:
-                    time.sleep(0.01)
-                else:
-                    raise
-
-    def test_flume_stream(self):
-        input = [str(i) for i in range(1, 101)]
-        result = self._startContext(len(input), False)
-        self._writeInput(input, False)
-        self.wait_for(result, len(input))
-        self._validateResult(input, result)
-
-    def test_compressed_flume_stream(self):
-        input = [str(i) for i in range(1, 101)]
-        result = self._startContext(len(input), True)
-        self._writeInput(input, True)
-        self.wait_for(result, len(input))
-        self._validateResult(input, result)
-
-
-class FlumePollingStreamTests(PySparkStreamingTestCase):
-    timeout = 20  # seconds
-    duration = 1
-    maxAttempts = 5
-
-    def setUp(self):
-        self._utils = self.sc._jvm.org.apache.spark.streaming.flume.PollingFlumeTestUtils()
-
-    def tearDown(self):
-        if self._utils is not None:
-            self._utils.close()
-            self._utils = None
-
-    def _writeAndVerify(self, ports):
-        # Set up the streaming context and input streams
-        ssc = StreamingContext(self.sc, self.duration)
-        try:
-            addresses = [("localhost", port) for port in ports]
-            dstream = FlumeUtils.createPollingStream(
-                ssc,
-                addresses,
-                maxBatchSize=self._utils.eventsPerBatch(),
-                parallelism=5)
-            outputBuffer = []
-
-            def get_output(_, rdd):
-                for e in rdd.collect():
-                    outputBuffer.append(e)
-
-            dstream.foreachRDD(get_output)
-            ssc.start()
-            self._utils.sendDataAndEnsureAllDataHasBeenReceived()
-
-            self.wait_for(outputBuffer, self._utils.getTotalEvents())
-            outputHeaders = [event[0] for event in outputBuffer]
-            outputBodies = [event[1] for event in outputBuffer]
-            self._utils.assertOutput(outputHeaders, outputBodies)
-        finally:
-            ssc.stop(False)
-
-    def _testMultipleTimes(self, f):
-        attempt = 0
-        while True:
-            try:
-                f()
-                break
-            except:
-                attempt += 1
-                if attempt >= self.maxAttempts:
-                    raise
-                else:
-                    import traceback
-                    traceback.print_exc()
-
-    def _testFlumePolling(self):
-        try:
-            port = self._utils.startSingleSink()
-            self._writeAndVerify([port])
-            self._utils.assertChannelsAreEmpty()
-        finally:
-            self._utils.close()
-
-    def _testFlumePollingMultipleHosts(self):
-        try:
-            port = self._utils.startSingleSink()
-            self._writeAndVerify([port])
-            self._utils.assertChannelsAreEmpty()
-        finally:
-            self._utils.close()
-
-    def test_flume_polling(self):
-        self._testMultipleTimes(self._testFlumePolling)
-
-    def test_flume_polling_multiple_hosts(self):
-        self._testMultipleTimes(self._testFlumePollingMultipleHosts)
-
-
 class KinesisStreamTests(PySparkStreamingTestCase):
 
     def test_kinesis_stream_api(self):
@@ -1531,23 +1388,6 @@ def search_kafka_assembly_jar():
         return jars[0]
 
 
-def search_flume_assembly_jar():
-    SPARK_HOME = os.environ["SPARK_HOME"]
-    flume_assembly_dir = os.path.join(SPARK_HOME, "external/flume-assembly")
-    jars = search_jar(flume_assembly_dir, "spark-streaming-flume-assembly")
-    if not jars:
-        raise Exception(
-            ("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir)
+
-            "You need to build Spark with "
-            "'build/sbt -Pflume assembly/package streaming-flume-assembly/assembly' or "
-            "'build/mvn -DskipTests -Pflume package' before running this test.")
-    elif len(jars) > 1:
-        raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please
"
-                        "remove all but one") % (", ".join(jars)))
-    else:
-        return jars[0]
-
-
 def _kinesis_asl_assembly_dir():
     SPARK_HOME = os.environ["SPARK_HOME"]
     return os.path.join(SPARK_HOME, "external/kinesis-asl-assembly")
@@ -1565,9 +1405,6 @@ def search_kinesis_asl_assembly_jar():
 
 
 # Must be same as the variable and condition defined in modules.py
-flume_test_environ_var = "ENABLE_FLUME_TESTS"
-are_flume_tests_enabled = os.environ.get(flume_test_environ_var) == '1'
-# Must be same as the variable and condition defined in modules.py
 kafka_test_environ_var = "ENABLE_KAFKA_0_8_TESTS"
 are_kafka_tests_enabled = os.environ.get(kafka_test_environ_var) == '1'
 # Must be same as the variable and condition defined in KinesisTestUtils.scala and modules.py
@@ -1577,15 +1414,14 @@ are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var)
== '1'
 if __name__ == "__main__":
     from pyspark.streaming.tests import *
     kafka_assembly_jar = search_kafka_assembly_jar()
-    flume_assembly_jar = search_flume_assembly_jar()
     kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
 
     if kinesis_asl_assembly_jar is None:
         kinesis_jar_present = False
-        jars = "%s,%s" % (kafka_assembly_jar, flume_assembly_jar)
+        jars = kafka_assembly_jar
     else:
         kinesis_jar_present = True
-        jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar)
+        jars = "%s,%s" % (kafka_assembly_jar, kinesis_asl_assembly_jar)
 
     existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
     jars_args = "--jars %s" % jars
@@ -1593,14 +1429,6 @@ if __name__ == "__main__":
     testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
                  StreamingListenerTests]
 
-    if are_flume_tests_enabled:
-        testcases.append(FlumeStreamTests)
-        testcases.append(FlumePollingStreamTests)
-    else:
-        sys.stderr.write(
-            "Skipped test_flume_stream (enable by setting environment variable %s=1"
-            % flume_test_environ_var)
-
     if are_kafka_tests_enabled:
         testcases.append(KafkaStreamTests)
     else:

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 0274038..122f25b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -537,7 +537,7 @@ class StreamingContext private[streaming] (
         ExecutorAllocationManager.isDynamicAllocationEnabled(conf)) {
       logWarning("Dynamic Allocation is enabled for this application. " +
         "Enabling Dynamic allocation for Spark Streaming applications can cause data loss
if " +
-        "Write Ahead Log is not enabled for non-replayable sources like Flume. " +
+        "Write Ahead Log is not enabled for non-replayable sources. " +
         "See the programming guide for details on how to enable the Write Ahead Log.")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index a59f4ef..9939686 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -30,7 +30,7 @@ import org.apache.spark.streaming.dstream.DStream
 /**
  * A Java-friendly interface to [[org.apache.spark.streaming.dstream.DStream]], the basic
  * abstraction in Spark Streaming that represents a continuous stream of data.
- * DStreams can either be created from live data (such as, data from TCP sockets, Kafka,
Flume,
+ * DStreams can either be created from live data (such as, data from TCP sockets, Kafka,
  * etc.) or it can be generated by transforming existing DStreams using operations such as
`map`,
  * `window`. For operations applicable to key-value pair DStreams, see
  * [[org.apache.spark.streaming.api.java.JavaPairDStream]].

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 4a4d2c5..3524337 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -40,7 +40,7 @@ import org.apache.spark.util.{CallSite, Utils}
  * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
  * sequence of RDDs (of the same type) representing a continuous stream of data (see
  * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
- * DStreams can either be created from live data (such as, data from TCP sockets, Kafka,
Flume,
+ * DStreams can either be created from live data (such as, data from TCP sockets, Kafka,
  * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by
  * transforming existing DStreams using operations such as `map`,
  * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
DStream

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 931f015..6495c91 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -56,7 +56,6 @@ abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
 
   /** A human-readable name of this InputDStream */
   private[streaming] def name: String = {
-    // e.g. FlumePollingDStream -> "Flume polling stream"
     val newName = Utils.getFormattedClassName(this)
       .replaceAll("InputDStream", "Stream")
       .split("(?=[A-Z])")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message