spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject [1/4] spark git commit: [SPARK-25705][BUILD][STREAMING][TEST-MAVEN] Remove Kafka 0.8 integration
Date Tue, 16 Oct 2018 14:10:28 GMT
Repository: spark
Updated Branches:
  refs/heads/master 2c664edc0 -> 703e6da1e


http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/python/pyspark/streaming/kafka.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
deleted file mode 100644
index ed2e0e7..0000000
--- a/python/pyspark/streaming/kafka.py
+++ /dev/null
@@ -1,506 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import warnings
-
-from py4j.protocol import Py4JJavaError
-
-from pyspark.rdd import RDD
-from pyspark.storagelevel import StorageLevel
-from pyspark.serializers import AutoBatchedSerializer, PickleSerializer, PairDeserializer,
\
-    NoOpSerializer
-from pyspark.streaming import DStream
-from pyspark.streaming.dstream import TransformedDStream
-from pyspark.streaming.util import TransformFunction
-
-__all__ = ['Broker', 'KafkaMessageAndMetadata', 'KafkaUtils', 'OffsetRange',
-           'TopicAndPartition', 'utf8_decoder']
-
-
-def utf8_decoder(s):
-    """ Decode the unicode as UTF-8 """
-    if s is None:
-        return None
-    return s.decode('utf-8')
-
-
-class KafkaUtils(object):
-
-    @staticmethod
-    def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
-                     storageLevel=StorageLevel.MEMORY_AND_DISK_2,
-                     keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
-        """
-        Create an input stream that pulls messages from a Kafka Broker.
-
-        :param ssc:  StreamingContext object
-        :param zkQuorum:  Zookeeper quorum (hostname:port,hostname:port,..).
-        :param groupId:  The group id for this consumer.
-        :param topics:  Dict of (topic_name -> numPartitions) to consume.
-                        Each partition is consumed in its own thread.
-        :param kafkaParams: Additional params for Kafka
-        :param storageLevel:  RDD storage level.
-        :param keyDecoder:  A function used to decode key (default is utf8_decoder)
-        :param valueDecoder:  A function used to decode value (default is utf8_decoder)
-        :return: A DStream object
-
-        .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
-            See SPARK-21893.
-        """
-        warnings.warn(
-            "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
-            "See SPARK-21893.",
-            DeprecationWarning)
-        if kafkaParams is None:
-            kafkaParams = dict()
-        kafkaParams.update({
-            "zookeeper.connect": zkQuorum,
-            "group.id": groupId,
-            "zookeeper.connection.timeout.ms": "10000",
-        })
-        if not isinstance(topics, dict):
-            raise TypeError("topics should be dict")
-        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
-        helper = KafkaUtils._get_helper(ssc._sc)
-        jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
-        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
-        stream = DStream(jstream, ssc, ser)
-        return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
-
-    @staticmethod
-    def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None,
-                           keyDecoder=utf8_decoder, valueDecoder=utf8_decoder,
-                           messageHandler=None):
-        """
-        Create an input stream that directly pulls messages from a Kafka Broker and specific
offset.
-
-        This is not a receiver based Kafka input stream, it directly pulls the message from
Kafka
-        in each batch duration and processed without storing.
-
-        This does not use Zookeeper to store offsets. The consumed offsets are tracked
-        by the stream itself. For interoperability with Kafka monitoring tools that depend
on
-        Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
-        You can access the offsets used in each batch from the generated RDDs (see
-
-        To recover from driver failures, you have to enable checkpointing in the StreamingContext.
-        The information on consumed offset can be recovered from the checkpoint.
-        See the programming guide for details (constraints, etc.).
-
-        :param ssc:  StreamingContext object.
-        :param topics:  list of topic_name to consume.
-        :param kafkaParams: Additional params for Kafka.
-        :param fromOffsets: Per-topic/partition Kafka offsets defining the (inclusive) starting
-                            point of the stream (a dictionary mapping `TopicAndPartition`
to
-                            integers).
-        :param keyDecoder:  A function used to decode key (default is utf8_decoder).
-        :param valueDecoder:  A function used to decode value (default is utf8_decoder).
-        :param messageHandler: A function used to convert KafkaMessageAndMetadata. You can
assess
-                               meta using messageHandler (default is None).
-        :return: A DStream object
-
-        .. note:: Experimental
-        .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
-            See SPARK-21893.
-        """
-        warnings.warn(
-            "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
-            "See SPARK-21893.",
-            DeprecationWarning)
-        if fromOffsets is None:
-            fromOffsets = dict()
-        if not isinstance(topics, list):
-            raise TypeError("topics should be list")
-        if not isinstance(kafkaParams, dict):
-            raise TypeError("kafkaParams should be dict")
-
-        def funcWithoutMessageHandler(k_v):
-            return (keyDecoder(k_v[0]), valueDecoder(k_v[1]))
-
-        def funcWithMessageHandler(m):
-            m._set_key_decoder(keyDecoder)
-            m._set_value_decoder(valueDecoder)
-            return messageHandler(m)
-
-        helper = KafkaUtils._get_helper(ssc._sc)
-
-        jfromOffsets = dict([(k._jTopicAndPartition(helper),
-                              v) for (k, v) in fromOffsets.items()])
-        if messageHandler is None:
-            ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
-            func = funcWithoutMessageHandler
-            jstream = helper.createDirectStreamWithoutMessageHandler(
-                ssc._jssc, kafkaParams, set(topics), jfromOffsets)
-        else:
-            ser = AutoBatchedSerializer(PickleSerializer())
-            func = funcWithMessageHandler
-            jstream = helper.createDirectStreamWithMessageHandler(
-                ssc._jssc, kafkaParams, set(topics), jfromOffsets)
-
-        stream = DStream(jstream, ssc, ser).map(func)
-        return KafkaDStream(stream._jdstream, ssc, stream._jrdd_deserializer)
-
-    @staticmethod
-    def createRDD(sc, kafkaParams, offsetRanges, leaders=None,
-                  keyDecoder=utf8_decoder, valueDecoder=utf8_decoder,
-                  messageHandler=None):
-        """
-        Create an RDD from Kafka using offset ranges for each topic and partition.
-
-        :param sc:  SparkContext object
-        :param kafkaParams: Additional params for Kafka
-        :param offsetRanges:  list of offsetRange to specify topic:partition:[start, end)
to consume
-        :param leaders: Kafka brokers for each TopicAndPartition in offsetRanges.  May be
an empty
-            map, in which case leaders will be looked up on the driver.
-        :param keyDecoder:  A function used to decode key (default is utf8_decoder)
-        :param valueDecoder:  A function used to decode value (default is utf8_decoder)
-        :param messageHandler: A function used to convert KafkaMessageAndMetadata. You can
assess
-                               meta using messageHandler (default is None).
-        :return: An RDD object
-
-        .. note:: Experimental
-        .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
-            See SPARK-21893.
-        """
-        warnings.warn(
-            "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
-            "See SPARK-21893.",
-            DeprecationWarning)
-        if leaders is None:
-            leaders = dict()
-        if not isinstance(kafkaParams, dict):
-            raise TypeError("kafkaParams should be dict")
-        if not isinstance(offsetRanges, list):
-            raise TypeError("offsetRanges should be list")
-
-        def funcWithoutMessageHandler(k_v):
-            return (keyDecoder(k_v[0]), valueDecoder(k_v[1]))
-
-        def funcWithMessageHandler(m):
-            m._set_key_decoder(keyDecoder)
-            m._set_value_decoder(valueDecoder)
-            return messageHandler(m)
-
-        helper = KafkaUtils._get_helper(sc)
-
-        joffsetRanges = [o._jOffsetRange(helper) for o in offsetRanges]
-        jleaders = dict([(k._jTopicAndPartition(helper),
-                          v._jBroker(helper)) for (k, v) in leaders.items()])
-        if messageHandler is None:
-            jrdd = helper.createRDDWithoutMessageHandler(
-                sc._jsc, kafkaParams, joffsetRanges, jleaders)
-            ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
-            rdd = RDD(jrdd, sc, ser).map(funcWithoutMessageHandler)
-        else:
-            jrdd = helper.createRDDWithMessageHandler(
-                sc._jsc, kafkaParams, joffsetRanges, jleaders)
-            rdd = RDD(jrdd, sc).map(funcWithMessageHandler)
-
-        return KafkaRDD(rdd._jrdd, sc, rdd._jrdd_deserializer)
-
-    @staticmethod
-    def _get_helper(sc):
-        try:
-            return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper()
-        except TypeError as e:
-            if str(e) == "'JavaPackage' object is not callable":
-                KafkaUtils._printErrorMsg(sc)
-            raise
-
-    @staticmethod
-    def _printErrorMsg(sc):
-        print("""
-________________________________________________________________________________________________
-
-  Spark Streaming's Kafka libraries not found in class path. Try one of the following.
-
-  1. Include the Kafka library and its dependencies with in the
-     spark-submit command as
-
-     $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:%s ...
-
-  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
-     Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version
= %s.
-     Then, include the jar in the spark-submit command as
-
-     $ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar> ...
-
-________________________________________________________________________________________________
-
-""" % (sc.version, sc.version))
-
-
-class OffsetRange(object):
-    """
-    Represents a range of offsets from a single Kafka TopicAndPartition.
-
-    .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
-        See SPARK-21893.
-    """
-
-    def __init__(self, topic, partition, fromOffset, untilOffset):
-        """
-        Create an OffsetRange to represent range of offsets
-        :param topic: Kafka topic name.
-        :param partition: Kafka partition id.
-        :param fromOffset: Inclusive starting offset.
-        :param untilOffset: Exclusive ending offset.
-        """
-        warnings.warn(
-            "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
-            "See SPARK-21893.",
-            DeprecationWarning)
-        self.topic = topic
-        self.partition = partition
-        self.fromOffset = fromOffset
-        self.untilOffset = untilOffset
-
-    def __eq__(self, other):
-        if isinstance(other, self.__class__):
-            return (self.topic == other.topic
-                    and self.partition == other.partition
-                    and self.fromOffset == other.fromOffset
-                    and self.untilOffset == other.untilOffset)
-        else:
-            return False
-
-    def __ne__(self, other):
-        return not self.__eq__(other)
-
-    def __str__(self):
-        return "OffsetRange(topic: %s, partition: %d, range: [%d -> %d]" \
-               % (self.topic, self.partition, self.fromOffset, self.untilOffset)
-
-    def _jOffsetRange(self, helper):
-        return helper.createOffsetRange(self.topic, self.partition, self.fromOffset,
-                                        self.untilOffset)
-
-
-class TopicAndPartition(object):
-    """
-    Represents a specific topic and partition for Kafka.
-
-    .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
-        See SPARK-21893.
-    """
-
-    def __init__(self, topic, partition):
-        """
-        Create a Python TopicAndPartition to map to the Java related object
-        :param topic: Kafka topic name.
-        :param partition: Kafka partition id.
-        """
-        warnings.warn(
-            "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
-            "See SPARK-21893.",
-            DeprecationWarning)
-        self._topic = topic
-        self._partition = partition
-
-    def _jTopicAndPartition(self, helper):
-        return helper.createTopicAndPartition(self._topic, self._partition)
-
-    def __eq__(self, other):
-        if isinstance(other, self.__class__):
-            return (self._topic == other._topic
-                    and self._partition == other._partition)
-        else:
-            return False
-
-    def __ne__(self, other):
-        return not self.__eq__(other)
-
-    def __hash__(self):
-        return (self._topic, self._partition).__hash__()
-
-
-class Broker(object):
-    """
-    Represent the host and port info for a Kafka broker.
-
-    .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
-        See SPARK-21893.
-    """
-
-    def __init__(self, host, port):
-        """
-        Create a Python Broker to map to the Java related object.
-        :param host: Broker's hostname.
-        :param port: Broker's port.
-        """
-        warnings.warn(
-            "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
-            "See SPARK-21893.",
-            DeprecationWarning)
-        self._host = host
-        self._port = port
-
-    def _jBroker(self, helper):
-        return helper.createBroker(self._host, self._port)
-
-
-class KafkaRDD(RDD):
-    """
-    A Python wrapper of KafkaRDD, to provide additional information on normal RDD.
-
-    .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
-        See SPARK-21893.
-    """
-
-    def __init__(self, jrdd, ctx, jrdd_deserializer):
-        warnings.warn(
-            "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
-            "See SPARK-21893.",
-            DeprecationWarning)
-        RDD.__init__(self, jrdd, ctx, jrdd_deserializer)
-
-    def offsetRanges(self):
-        """
-        Get the OffsetRange of specific KafkaRDD.
-        :return: A list of OffsetRange
-        """
-        helper = KafkaUtils._get_helper(self.ctx)
-        joffsetRanges = helper.offsetRangesOfKafkaRDD(self._jrdd.rdd())
-        ranges = [OffsetRange(o.topic(), o.partition(), o.fromOffset(), o.untilOffset())
-                  for o in joffsetRanges]
-        return ranges
-
-
-class KafkaDStream(DStream):
-    """
-    A Python wrapper of KafkaDStream
-
-    .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
-        See SPARK-21893.
-    """
-
-    def __init__(self, jdstream, ssc, jrdd_deserializer):
-        warnings.warn(
-            "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
-            "See SPARK-21893.",
-            DeprecationWarning)
-        DStream.__init__(self, jdstream, ssc, jrdd_deserializer)
-
-    def foreachRDD(self, func):
-        """
-        Apply a function to each RDD in this DStream.
-        """
-        if func.__code__.co_argcount == 1:
-            old_func = func
-            func = lambda r, rdd: old_func(rdd)
-        jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer) \
-            .rdd_wrapper(lambda jrdd, ctx, ser: KafkaRDD(jrdd, ctx, ser))
-        api = self._ssc._jvm.PythonDStream
-        api.callForeachRDD(self._jdstream, jfunc)
-
-    def transform(self, func):
-        """
-        Return a new DStream in which each RDD is generated by applying a function
-        on each RDD of this DStream.
-
-        `func` can have one argument of `rdd`, or have two arguments of
-        (`time`, `rdd`)
-        """
-        if func.__code__.co_argcount == 1:
-            oldfunc = func
-            func = lambda t, rdd: oldfunc(rdd)
-        assert func.__code__.co_argcount == 2, "func should take one or two arguments"
-
-        return KafkaTransformedDStream(self, func)
-
-
-class KafkaTransformedDStream(TransformedDStream):
-    """
-    Kafka specific wrapper of TransformedDStream to transform on Kafka RDD.
-
-    .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
-        See SPARK-21893.
-    """
-
-    def __init__(self, prev, func):
-        warnings.warn(
-            "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
-            "See SPARK-21893.",
-            DeprecationWarning)
-        TransformedDStream.__init__(self, prev, func)
-
-    @property
-    def _jdstream(self):
-        if self._jdstream_val is not None:
-            return self._jdstream_val
-
-        jfunc = TransformFunction(self._sc, self.func, self.prev._jrdd_deserializer) \
-            .rdd_wrapper(lambda jrdd, ctx, ser: KafkaRDD(jrdd, ctx, ser))
-        dstream = self._sc._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), jfunc)
-        self._jdstream_val = dstream.asJavaDStream()
-        return self._jdstream_val
-
-
-class KafkaMessageAndMetadata(object):
-    """
-    Kafka message and metadata information. Including topic, partition, offset and message
-
-    .. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
-        See SPARK-21893.
-    """
-
-    def __init__(self, topic, partition, offset, key, message):
-        """
-        Python wrapper of Kafka MessageAndMetadata
-        :param topic: topic name of this Kafka message
-        :param partition: partition id of this Kafka message
-        :param offset: Offset of this Kafka message in the specific partition
-        :param key: key payload of this Kafka message, can be null if this Kafka message
has no key
-                    specified, the return data is undecoded bytearry.
-        :param message: actual message payload of this Kafka message, the return data is
-                        undecoded bytearray.
-        """
-        warnings.warn(
-            "Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
-            "See SPARK-21893.",
-            DeprecationWarning)
-        self.topic = topic
-        self.partition = partition
-        self.offset = offset
-        self._rawKey = key
-        self._rawMessage = message
-        self._keyDecoder = utf8_decoder
-        self._valueDecoder = utf8_decoder
-
-    def __str__(self):
-        return "KafkaMessageAndMetadata(topic: %s, partition: %d, offset: %d, key and message...)"
\
-               % (self.topic, self.partition, self.offset)
-
-    def __repr__(self):
-        return self.__str__()
-
-    def __reduce__(self):
-        return (KafkaMessageAndMetadata,
-                (self.topic, self.partition, self.offset, self._rawKey, self._rawMessage))
-
-    def _set_key_decoder(self, decoder):
-        self._keyDecoder = decoder
-
-    def _set_value_decoder(self, decoder):
-        self._valueDecoder = decoder
-
-    @property
-    def key(self):
-        return self._keyDecoder(self._rawKey)
-
-    @property
-    def message(self):
-        return self._valueDecoder(self._rawMessage)

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 4b995c0..8df00bc 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -47,7 +47,6 @@ if sys.version >= "3":
 from pyspark.context import SparkConf, SparkContext, RDD
 from pyspark.storagelevel import StorageLevel
 from pyspark.streaming.context import StreamingContext
-from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
 from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
 from pyspark.streaming.listener import StreamingListener
 
@@ -1047,259 +1046,6 @@ class CheckpointTests(unittest.TestCase):
         self.ssc.stop(True, True)
 
 
-class KafkaStreamTests(PySparkStreamingTestCase):
-    timeout = 20  # seconds
-    duration = 1
-
-    def setUp(self):
-        super(KafkaStreamTests, self).setUp()
-        self._kafkaTestUtils = self.ssc._jvm.org.apache.spark.streaming.kafka.KafkaTestUtils()
-        self._kafkaTestUtils.setup()
-
-    def tearDown(self):
-        super(KafkaStreamTests, self).tearDown()
-
-        if self._kafkaTestUtils is not None:
-            self._kafkaTestUtils.teardown()
-            self._kafkaTestUtils = None
-
-    def _randomTopic(self):
-        return "topic-%d" % random.randint(0, 10000)
-
-    def _validateStreamResult(self, sendData, stream):
-        result = {}
-        for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
-                                                   sum(sendData.values()))):
-            result[i] = result.get(i, 0) + 1
-
-        self.assertEqual(sendData, result)
-
-    def _validateRddResult(self, sendData, rdd):
-        result = {}
-        for i in rdd.map(lambda x: x[1]).collect():
-            result[i] = result.get(i, 0) + 1
-        self.assertEqual(sendData, result)
-
-    def test_kafka_stream(self):
-        """Test the Python Kafka stream API."""
-        topic = self._randomTopic()
-        sendData = {"a": 3, "b": 5, "c": 10}
-
-        self._kafkaTestUtils.createTopic(topic)
-        self._kafkaTestUtils.sendMessages(topic, sendData)
-
-        stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
-                                         "test-streaming-consumer", {topic: 1},
-                                         {"auto.offset.reset": "smallest"})
-        self._validateStreamResult(sendData, stream)
-
-    def test_kafka_direct_stream(self):
-        """Test the Python direct Kafka stream API."""
-        topic = self._randomTopic()
-        sendData = {"a": 1, "b": 2, "c": 3}
-        kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
-                       "auto.offset.reset": "smallest"}
-
-        self._kafkaTestUtils.createTopic(topic)
-        self._kafkaTestUtils.sendMessages(topic, sendData)
-
-        stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
-        self._validateStreamResult(sendData, stream)
-
-    def test_kafka_direct_stream_from_offset(self):
-        """Test the Python direct Kafka stream API with start offset specified."""
-        topic = self._randomTopic()
-        sendData = {"a": 1, "b": 2, "c": 3}
-        fromOffsets = {TopicAndPartition(topic, 0): long(0)}
-        kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
-
-        self._kafkaTestUtils.createTopic(topic)
-        self._kafkaTestUtils.sendMessages(topic, sendData)
-
-        stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
-        self._validateStreamResult(sendData, stream)
-
-    def test_kafka_rdd(self):
-        """Test the Python direct Kafka RDD API."""
-        topic = self._randomTopic()
-        sendData = {"a": 1, "b": 2}
-        offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
-        kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
-
-        self._kafkaTestUtils.createTopic(topic)
-        self._kafkaTestUtils.sendMessages(topic, sendData)
-        rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
-        self._validateRddResult(sendData, rdd)
-
-    def test_kafka_rdd_with_leaders(self):
-        """Test the Python direct Kafka RDD API with leaders."""
-        topic = self._randomTopic()
-        sendData = {"a": 1, "b": 2, "c": 3}
-        offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
-        kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
-        address = self._kafkaTestUtils.brokerAddress().split(":")
-        leaders = {TopicAndPartition(topic, 0): Broker(address[0], int(address[1]))}
-
-        self._kafkaTestUtils.createTopic(topic)
-        self._kafkaTestUtils.sendMessages(topic, sendData)
-        rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
-        self._validateRddResult(sendData, rdd)
-
-    def test_kafka_rdd_get_offsetRanges(self):
-        """Test Python direct Kafka RDD get OffsetRanges."""
-        topic = self._randomTopic()
-        sendData = {"a": 3, "b": 4, "c": 5}
-        offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
-        kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
-
-        self._kafkaTestUtils.createTopic(topic)
-        self._kafkaTestUtils.sendMessages(topic, sendData)
-        rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
-        self.assertEqual(offsetRanges, rdd.offsetRanges())
-
-    def test_kafka_direct_stream_foreach_get_offsetRanges(self):
-        """Test the Python direct Kafka stream foreachRDD get offsetRanges."""
-        topic = self._randomTopic()
-        sendData = {"a": 1, "b": 2, "c": 3}
-        kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
-                       "auto.offset.reset": "smallest"}
-
-        self._kafkaTestUtils.createTopic(topic)
-        self._kafkaTestUtils.sendMessages(topic, sendData)
-
-        stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
-
-        offsetRanges = []
-
-        def getOffsetRanges(_, rdd):
-            for o in rdd.offsetRanges():
-                offsetRanges.append(o)
-
-        stream.foreachRDD(getOffsetRanges)
-        self.ssc.start()
-        self.wait_for(offsetRanges, 1)
-
-        self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))])
-
-    def test_kafka_direct_stream_transform_get_offsetRanges(self):
-        """Test the Python direct Kafka stream transform get offsetRanges."""
-        topic = self._randomTopic()
-        sendData = {"a": 1, "b": 2, "c": 3}
-        kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
-                       "auto.offset.reset": "smallest"}
-
-        self._kafkaTestUtils.createTopic(topic)
-        self._kafkaTestUtils.sendMessages(topic, sendData)
-
-        stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
-
-        offsetRanges = []
-
-        def transformWithOffsetRanges(rdd):
-            for o in rdd.offsetRanges():
-                offsetRanges.append(o)
-            return rdd
-
-        # Test whether it is ok mixing KafkaTransformedDStream and TransformedDStream together,
-        # only the TransformedDstreams can be folded together.
-        stream.transform(transformWithOffsetRanges).map(lambda kv: kv[1]).count().pprint()
-        self.ssc.start()
-        self.wait_for(offsetRanges, 1)
-
-        self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))])
-
-    def test_topic_and_partition_equality(self):
-        topic_and_partition_a = TopicAndPartition("foo", 0)
-        topic_and_partition_b = TopicAndPartition("foo", 0)
-        topic_and_partition_c = TopicAndPartition("bar", 0)
-        topic_and_partition_d = TopicAndPartition("foo", 1)
-
-        self.assertEqual(topic_and_partition_a, topic_and_partition_b)
-        self.assertNotEqual(topic_and_partition_a, topic_and_partition_c)
-        self.assertNotEqual(topic_and_partition_a, topic_and_partition_d)
-
-    def test_kafka_direct_stream_transform_with_checkpoint(self):
-        """Test the Python direct Kafka stream transform with checkpoint correctly recovered."""
-        topic = self._randomTopic()
-        sendData = {"a": 1, "b": 2, "c": 3}
-        kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
-                       "auto.offset.reset": "smallest"}
-
-        self._kafkaTestUtils.createTopic(topic)
-        self._kafkaTestUtils.sendMessages(topic, sendData)
-
-        offsetRanges = []
-
-        def transformWithOffsetRanges(rdd):
-            for o in rdd.offsetRanges():
-                offsetRanges.append(o)
-            return rdd
-
-        self.ssc.stop(False)
-        self.ssc = None
-        tmpdir = "checkpoint-test-%d" % random.randint(0, 10000)
-
-        def setup():
-            ssc = StreamingContext(self.sc, 0.5)
-            ssc.checkpoint(tmpdir)
-            stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
-            stream.transform(transformWithOffsetRanges).count().pprint()
-            return ssc
-
-        try:
-            ssc1 = StreamingContext.getOrCreate(tmpdir, setup)
-            ssc1.start()
-            self.wait_for(offsetRanges, 1)
-            self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))])
-
-            # To make sure some checkpoint is written
-            time.sleep(3)
-            ssc1.stop(False)
-            ssc1 = None
-
-            # Restart again to make sure the checkpoint is recovered correctly
-            ssc2 = StreamingContext.getOrCreate(tmpdir, setup)
-            ssc2.start()
-            ssc2.awaitTermination(3)
-            ssc2.stop(stopSparkContext=False, stopGraceFully=True)
-            ssc2 = None
-        finally:
-            shutil.rmtree(tmpdir)
-
-    def test_kafka_rdd_message_handler(self):
-        """Test Python direct Kafka RDD MessageHandler."""
-        topic = self._randomTopic()
-        sendData = {"a": 1, "b": 1, "c": 2}
-        offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
-        kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
-
-        def getKeyAndDoubleMessage(m):
-            return m and (m.key, m.message * 2)
-
-        self._kafkaTestUtils.createTopic(topic)
-        self._kafkaTestUtils.sendMessages(topic, sendData)
-        rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges,
-                                   messageHandler=getKeyAndDoubleMessage)
-        self._validateRddResult({"aa": 1, "bb": 1, "cc": 2}, rdd)
-
-    def test_kafka_direct_stream_message_handler(self):
-        """Test the Python direct Kafka stream MessageHandler."""
-        topic = self._randomTopic()
-        sendData = {"a": 1, "b": 2, "c": 3}
-        kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
-                       "auto.offset.reset": "smallest"}
-
-        self._kafkaTestUtils.createTopic(topic)
-        self._kafkaTestUtils.sendMessages(topic, sendData)
-
-        def getKeyAndDoubleMessage(m):
-            return m and (m.key, m.message * 2)
-
-        stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams,
-                                               messageHandler=getKeyAndDoubleMessage)
-        self._validateStreamResult({"aa": 1, "bb": 2, "cc": 3}, stream)
-
-
 class KinesisStreamTests(PySparkStreamingTestCase):
 
     def test_kinesis_stream_api(self):
@@ -1371,23 +1117,6 @@ def search_jar(dir, name_prefix):
     return [jar for jar in jars if not jar.endswith(ignored_jar_suffixes)]
 
 
-def search_kafka_assembly_jar():
-    SPARK_HOME = os.environ["SPARK_HOME"]
-    kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-0-8-assembly")
-    jars = search_jar(kafka_assembly_dir, "spark-streaming-kafka-0-8-assembly")
-    if not jars:
-        raise Exception(
-            ("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir)
+
-            "You need to build Spark with "
-            "'build/sbt -Pkafka-0-8 assembly/package streaming-kafka-0-8-assembly/assembly'
or "
-            "'build/mvn -DskipTests -Pkafka-0-8 package' before running this test.")
-    elif len(jars) > 1:
-        raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: %s; please
"
-                         "remove all but one") % (", ".join(jars)))
-    else:
-        return jars[0]
-
-
 def _kinesis_asl_assembly_dir():
     SPARK_HOME = os.environ["SPARK_HOME"]
     return os.path.join(SPARK_HOME, "external/kinesis-asl-assembly")
@@ -1404,38 +1133,26 @@ def search_kinesis_asl_assembly_jar():
         return jars[0]
 
 
-# Must be same as the variable and condition defined in modules.py
-kafka_test_environ_var = "ENABLE_KAFKA_0_8_TESTS"
-are_kafka_tests_enabled = os.environ.get(kafka_test_environ_var) == '1'
 # Must be same as the variable and condition defined in KinesisTestUtils.scala and modules.py
 kinesis_test_environ_var = "ENABLE_KINESIS_TESTS"
 are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1'
 
 if __name__ == "__main__":
     from pyspark.streaming.tests import *
-    kafka_assembly_jar = search_kafka_assembly_jar()
     kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
 
     if kinesis_asl_assembly_jar is None:
         kinesis_jar_present = False
-        jars = kafka_assembly_jar
+        jars_args = ""
     else:
         kinesis_jar_present = True
-        jars = "%s,%s" % (kafka_assembly_jar, kinesis_asl_assembly_jar)
+        jars_args = "--jars %s" % kinesis_asl_assembly_jar
 
     existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
-    jars_args = "--jars %s" % jars
     os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])
     testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
                  StreamingListenerTests]
 
-    if are_kafka_tests_enabled:
-        testcases.append(KafkaStreamTests)
-    else:
-        sys.stderr.write(
-            "Skipped test_kafka_stream (enable by setting environment variable %s=1"
-            % kafka_test_environ_var)
-
     if kinesis_jar_present is True:
         testcases.append(KinesisStreamTests)
     elif are_kinesis_tests_enabled is False:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message