Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 54561200B4D for ; Sat, 23 Jul 2016 18:20:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 52E2C160A77; Sat, 23 Jul 2016 16:20:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7EF65160A68 for ; Sat, 23 Jul 2016 18:20:45 +0200 (CEST) Received: (qmail 57519 invoked by uid 500); 23 Jul 2016 16:20:44 -0000 Mailing-List: contact commits-help@bahir.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@bahir.apache.org Delivered-To: mailing list commits@bahir.apache.org Received: (qmail 57510 invoked by uid 99); 23 Jul 2016 16:20:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Jul 2016 16:20:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 769F3E7E18; Sat, 23 Jul 2016 16:20:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lresende@apache.org To: commits@bahir.apache.org Message-Id: <7ab413f5b1214284b47c1af7ff0422f6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: bahir git commit: [BAHIR-24] fix MQTT Python code, examples, add tests Date: Sat, 23 Jul 2016 16:20:44 +0000 (UTC) archived-at: Sat, 23 Jul 2016 16:20:47 -0000 Repository: bahir Updated Branches: refs/heads/master 48e91fca5 -> 12f130846 [BAHIR-24] fix MQTT Python code, examples, add tests Changes in this PR: - remove unnecessary files from streaming-mqtt/python - updated all *.py files with respect to the modified project structure pyspark.streaming.mqtt --> mqtt - add test cases that were left out from the import and add shell script to run them: - streaming-mqtt/python-tests/run-python-tests.sh - streaming-mqtt/python-tests/tests.py - modify MQTTTestUtils.scala to limit the required disk storage space - modify bin/run-example script to setup PYTHONPATH to run Python examples Closes #10 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/12f13084 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/12f13084 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/12f13084 Branch: refs/heads/master Commit: 12f130846ef7523138e98e79bfd823f61acab3b3 Parents: 48e91fc Author: Christian Kadner Authored: Fri Jul 15 17:49:40 2016 -0700 Committer: Luciano Resende Committed: Sat Jul 23 09:19:57 2016 -0700 ---------------------------------------------------------------------- .gitignore | 3 + bin/run-example | 25 +- pom.xml | 2 +- .../src/main/python/streaming/mqtt_wordcount.py | 34 +- streaming-mqtt/python-tests/run-python-tests.sh | 79 +++ streaming-mqtt/python-tests/tests.py | 99 +++ streaming-mqtt/python/__init__.py | 22 - streaming-mqtt/python/dstream.py | 643 ------------------- streaming-mqtt/python/mqtt.py | 27 +- .../spark/streaming/mqtt/MQTTTestUtils.scala | 6 + 10 files changed, 247 insertions(+), 693 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 1801b67..fb6d3b7 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,9 @@ target/ *.class *.log +# Python +*.pyc + # Others .checkstyle .fbExcludeFilterFile http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/bin/run-example ---------------------------------------------------------------------- diff --git a/bin/run-example b/bin/run-example index 6f3cb39..483d853 100755 --- a/bin/run-example +++ b/bin/run-example @@ -16,6 +16,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +set -o pipefail # make sure Spark home is set and valid if [ -z "${SPARK_HOME}" ]; then @@ -61,8 +62,8 @@ DESCRIPTION USAGE EXAMPLES EOF - grep -R "bin/run-example org.apache" --no-filename --include="*.scala" --include="*.java" "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * / /g' ; \ - grep -R -A1 "bin/run-example \\\\" --no-filename --include="*.scala" --include="*.java" "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * / /g' | sed '/^--$/d' | sed 'N;s/\\\n *//g' + grep -R "bin/run-example org.apache" --no-filename --include=*.{scala,java,py} "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * / /g' ; \ + grep -R -A1 "bin/run-example \\\\" --no-filename --include=*.{scala,java,py} "${project_dir}" | tr '`' ' ' | sed 's/^ *\* * / /g' | sed '/^--$/d' | sed 'N;s/\\\n *//g' exit 1 } @@ -122,10 +123,22 @@ examples_jar="${module_tests_jar_path}" # streaming-akka/target/spark-streaming-akka_2.11-2.0.0-SNAPSHOT-tests.jar \ # localhost 9999 -# capture the full command line and echo it for transparency and debug purposes -cmd="${SPARK_HOME}/bin/spark-submit \ - --packages ${spark_package} \ - --class ${example_class} ${examples_jar} ${example_args}" +# for Python examples add all of the Bahir project's Python sources to PYTHONPATH, which in local +# mode is easier than creating a zip files to be used with the --py-files option (TODO: BAHIR-35) +# Note that --py-files with individual *.py files does not work if those modules are imported at top +# of the example script but rather imports must be pushed down to after SparkContext initialization +if [[ "$example_class" == *.py ]]; then + export PYTHONPATH="$( find "$project_dir" -path '*/python' -maxdepth 5 -type d | tr '\n' ':' )$PYTHONPATH" + cmd="${SPARK_HOME}/bin/spark-submit \ + --packages ${spark_package} \ + ${example_class} \ + ${example_args}" +else + cmd="${SPARK_HOME}/bin/spark-submit \ + --packages ${spark_package} \ + --class ${example_class} ${examples_jar} \ + ${example_args}" +fi echo "---" echo "Spark-Submit command: $cmd" http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index bbfa340..b9158a1 100644 --- a/pom.xml +++ b/pom.xml @@ -95,7 +95,7 @@ 1.2.17 - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT com.typesafe.akka http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py ---------------------------------------------------------------------- diff --git a/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py b/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py index abf9c0e..19838dc 100644 --- a/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py +++ b/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py @@ -20,23 +20,35 @@ Usage: mqtt_wordcount.py To run this in your local machine, you need to setup a MQTT broker and publisher first, - Mosquitto is one of the open source MQTT Brokers, see - http://mosquitto.org/ - Eclipse paho project provides number of clients and utilities for working with MQTT, see - http://www.eclipse.org/paho/#getting-started - - and then run the example - `$ bin/spark-submit --jars \ - external/mqtt-assembly/target/scala-*/spark-streaming-mqtt-assembly-*.jar \ - examples/src/main/python/streaming/mqtt_wordcount.py \ - tcp://localhost:1883 foo` + like Mosquitto (http://mosquitto.org/) an easy to use and install open source MQTT Broker. + On Mac OS Mosquitto can be installed with Homebrew `$ brew install mosquitto`. + On Ubuntu mosquitto can be installed with the command `$ sudo apt-get install mosquitto`. + + Alternatively, the Eclipse paho project provides a number of clients and utilities for + working with MQTT, see http://www.eclipse.org/paho/#getting-started + + How to run this example locally: + + (1) Start Mqtt message broker/server, i.e. Mosquitto: + + `$ mosquitto -p 1883` + + (2) Run the publisher: + + `$ bin/run-example \ + org.apache.spark.examples.streaming.mqtt.MQTTPublisher tcp://localhost:1883 foo` + + (3) Run the example: + + `$ bin/run-example \ + streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py tcp://localhost:1883 foo` """ import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext -from pyspark.streaming.mqtt import MQTTUtils +from mqtt import MQTTUtils if __name__ == "__main__": if len(sys.argv) != 3: http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/python-tests/run-python-tests.sh ---------------------------------------------------------------------- diff --git a/streaming-mqtt/python-tests/run-python-tests.sh b/streaming-mqtt/python-tests/run-python-tests.sh new file mode 100755 index 0000000..557b164 --- /dev/null +++ b/streaming-mqtt/python-tests/run-python-tests.sh @@ -0,0 +1,79 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +set -o pipefail + +# make sure Spark home is set and valid +if [ -z "${SPARK_HOME}" ]; then + echo "SPARK_HOME is not set" >&2 + exit 1 +elif [ ! -d "${SPARK_HOME}" ]; then + echo "SPARK_HOME does not point to a valid directory" >&2 + exit 1 +fi + +# pinpoint the module folder and project root folder +bin_dir=$( dirname "$0" ) +module_dir=$( cd "${bin_dir}/.." && pwd -P ) +project_dir=$( cd "${module_dir}/.." && pwd -P ) +stdout_log="${module_dir}/target/python-tests-python-output.log" +stderr_log="${module_dir}/target/python-tests-java-output.log" + +# use the module name to find the tests jar file that contains the example to run +module_name=${module_dir#"${project_dir}"/} +module_tests_jar_path=$( find "${module_dir}/target" -name "*${module_name}*-tests.jar" -maxdepth 1 | head -1 ) + +if [ -z "${module_tests_jar_path}" ] || [ ! -e "${module_tests_jar_path}" ]; then + echo "Could not find module tests jar file in ${module_dir}/target/" >&2 + echo "Run \"mvn clean install\" and retry running this example" >&2 + exit 1 +fi + +# use maven-help-plugin to determine project version and Scala version +module_version=$( cd "${module_dir}" && mvn org.apache.maven.plugins:maven-help-plugin:2.2:evaluate -Dexpression=project.version | grep -v "INFO\|WARNING\|ERROR\|Downloading" | tail -1 ) +scala_version=$( cd "${module_dir}" && mvn org.apache.maven.plugins:maven-help-plugin:2.2:evaluate -Dexpression=scala.binary.version | grep -v "INFO\|WARNING\|ERROR\|Downloading" | tail -1 ) + +# we are using spark-submit with --packages to run the tests and all necessary dependencies are +# resolved by maven which requires running "mvn" or "mvn install" first +spark_packages="org.apache.bahir:spark-${module_name}_${scala_version}:${module_version}" + +# find additional test-scoped dependencies and add them to the --packages list +test_dependencies=$( cd "${project_dir}" && mvn dependency:tree -Dscope=test -Dtokens=standard -pl ${module_name} | grep "\[INFO\] +- [a-z].*:test" | grep -ivE "spark|bahir|scala|junit" | sed 's/\[INFO\] +- //; s/:jar//; s/:test//' ) +for td in ${test_dependencies}; do + spark_packages="${spark_packages},${td}" +done + +# since we are running locally, we can use PYTHONPATH instead of --py-files (TODO: BAHIR-35) +export PYTHONPATH="${module_dir}/python:${PYTHONPATH}" + +# run the tests via spark-submit and capture the output in two separate log files (stdout=Python, +# stderr=Java) while only printing stdout to console +"${SPARK_HOME}"/bin/spark-submit \ + --master local[*] \ + --driver-memory 512m \ + --packages "${spark_packages}" \ + --jars "${module_tests_jar_path}" \ + "${module_dir}/python-tests/tests.py" \ + 1> >( tee "${stdout_log}" | grep -w '[[:alpha:]=-]\{2,\}' ) \ + 2> "${stderr_log}" + +# if the Python code doesn't get executed due to errors in SparkSubmit the stdout log file will be +# empty and nothing was logged to the console, then lets print the stderr log (Java output) +if [ ! -s "${stdout_log}" ]; then + cat "${stderr_log}" + echo "Error during test execution" +fi http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/python-tests/tests.py ---------------------------------------------------------------------- diff --git a/streaming-mqtt/python-tests/tests.py b/streaming-mqtt/python-tests/tests.py new file mode 100644 index 0000000..749313f --- /dev/null +++ b/streaming-mqtt/python-tests/tests.py @@ -0,0 +1,99 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys +import time +import random + +if sys.version_info[:2] <= (2, 6): + try: + import unittest2 as unittest + except ImportError: + sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier') + sys.exit(1) +else: + import unittest + +from pyspark.context import SparkConf, SparkContext, RDD +from pyspark.streaming.context import StreamingContext +from pyspark.streaming.tests import PySparkStreamingTestCase +from mqtt import MQTTUtils + +class MQTTStreamTests(PySparkStreamingTestCase): + timeout = 20 # seconds + duration = 1 + + def setUp(self): + super(MQTTStreamTests, self).setUp() + + MQTTTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ + .loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils") + self._MQTTTestUtils = MQTTTestUtilsClz.newInstance() + self._MQTTTestUtils.setup() + + def tearDown(self): + if self._MQTTTestUtils is not None: + self._MQTTTestUtils.teardown() + self._MQTTTestUtils = None + + super(MQTTStreamTests, self).tearDown() + + def _randomTopic(self): + return "topic-%d" % random.randint(0, 10000) + + def _startContext(self, topic): + # Start the StreamingContext and also collect the result + stream = MQTTUtils.createStream(self.ssc, "tcp://" + self._MQTTTestUtils.brokerUri(), topic) + result = [] + + def getOutput(_, rdd): + for data in rdd.collect(): + result.append(data) + + stream.foreachRDD(getOutput) + self.ssc.start() + return result + + def test_mqtt_stream(self): + """Test the Python MQTT stream API.""" + sendData = "MQTT demo for spark streaming" + topic = self._randomTopic() + result = self._startContext(topic) + + def retry(): + self._MQTTTestUtils.publishData(topic, sendData) + # Because "publishData" sends duplicate messages, here we should use > 0 + self.assertTrue(len(result) > 0) + self.assertEqual(sendData, result[0]) + + # Retry it because we don't know when the receiver will start. + self._retry_or_timeout(retry) + + def _retry_or_timeout(self, test_func): + start_time = time.time() + while True: + try: + test_func() + break + except: + if time.time() - start_time > self.timeout: + raise + time.sleep(0.01) + + +if __name__ == "__main__": + unittest.main() http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/python/__init__.py ---------------------------------------------------------------------- diff --git a/streaming-mqtt/python/__init__.py b/streaming-mqtt/python/__init__.py deleted file mode 100644 index 66e8f8e..0000000 --- a/streaming-mqtt/python/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from pyspark.streaming.context import StreamingContext -from pyspark.streaming.dstream import DStream -from pyspark.streaming.listener import StreamingListener - -__all__ = ['StreamingContext', 'DStream', 'StreamingListener'] http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/python/dstream.py ---------------------------------------------------------------------- diff --git a/streaming-mqtt/python/dstream.py b/streaming-mqtt/python/dstream.py deleted file mode 100644 index 2056663..0000000 --- a/streaming-mqtt/python/dstream.py +++ /dev/null @@ -1,643 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import sys -import operator -import time -from itertools import chain -from datetime import datetime - -if sys.version < "3": - from itertools import imap as map, ifilter as filter - -from py4j.protocol import Py4JJavaError - -from pyspark import RDD -from pyspark.storagelevel import StorageLevel -from pyspark.streaming.util import rddToFileName, TransformFunction -from pyspark.rdd import portable_hash -from pyspark.resultiterable import ResultIterable - -__all__ = ["DStream"] - - -class DStream(object): - """ - A Discretized Stream (DStream), the basic abstraction in Spark Streaming, - is a continuous sequence of RDDs (of the same type) representing a - continuous stream of data (see L{RDD} in the Spark core documentation - for more details on RDDs). - - DStreams can either be created from live data (such as, data from TCP - sockets, Kafka, Flume, etc.) using a L{StreamingContext} or it can be - generated by transforming existing DStreams using operations such as - `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming - program is running, each DStream periodically generates a RDD, either - from live data or by transforming the RDD generated by a parent DStream. - - DStreams internally is characterized by a few basic properties: - - A list of other DStreams that the DStream depends on - - A time interval at which the DStream generates an RDD - - A function that is used to generate an RDD after each time interval - """ - def __init__(self, jdstream, ssc, jrdd_deserializer): - self._jdstream = jdstream - self._ssc = ssc - self._sc = ssc._sc - self._jrdd_deserializer = jrdd_deserializer - self.is_cached = False - self.is_checkpointed = False - - def context(self): - """ - Return the StreamingContext associated with this DStream - """ - return self._ssc - - def count(self): - """ - Return a new DStream in which each RDD has a single element - generated by counting each RDD of this DStream. - """ - return self.mapPartitions(lambda i: [sum(1 for _ in i)]).reduce(operator.add) - - def filter(self, f): - """ - Return a new DStream containing only the elements that satisfy predicate. - """ - def func(iterator): - return filter(f, iterator) - return self.mapPartitions(func, True) - - def flatMap(self, f, preservesPartitioning=False): - """ - Return a new DStream by applying a function to all elements of - this DStream, and then flattening the results - """ - def func(s, iterator): - return chain.from_iterable(map(f, iterator)) - return self.mapPartitionsWithIndex(func, preservesPartitioning) - - def map(self, f, preservesPartitioning=False): - """ - Return a new DStream by applying a function to each element of DStream. - """ - def func(iterator): - return map(f, iterator) - return self.mapPartitions(func, preservesPartitioning) - - def mapPartitions(self, f, preservesPartitioning=False): - """ - Return a new DStream in which each RDD is generated by applying - mapPartitions() to each RDDs of this DStream. - """ - def func(s, iterator): - return f(iterator) - return self.mapPartitionsWithIndex(func, preservesPartitioning) - - def mapPartitionsWithIndex(self, f, preservesPartitioning=False): - """ - Return a new DStream in which each RDD is generated by applying - mapPartitionsWithIndex() to each RDDs of this DStream. - """ - return self.transform(lambda rdd: rdd.mapPartitionsWithIndex(f, preservesPartitioning)) - - def reduce(self, func): - """ - Return a new DStream in which each RDD has a single element - generated by reducing each RDD of this DStream. - """ - return self.map(lambda x: (None, x)).reduceByKey(func, 1).map(lambda x: x[1]) - - def reduceByKey(self, func, numPartitions=None): - """ - Return a new DStream by applying reduceByKey to each RDD. - """ - if numPartitions is None: - numPartitions = self._sc.defaultParallelism - return self.combineByKey(lambda x: x, func, func, numPartitions) - - def combineByKey(self, createCombiner, mergeValue, mergeCombiners, - numPartitions=None): - """ - Return a new DStream by applying combineByKey to each RDD. - """ - if numPartitions is None: - numPartitions = self._sc.defaultParallelism - - def func(rdd): - return rdd.combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions) - return self.transform(func) - - def partitionBy(self, numPartitions, partitionFunc=portable_hash): - """ - Return a copy of the DStream in which each RDD are partitioned - using the specified partitioner. - """ - return self.transform(lambda rdd: rdd.partitionBy(numPartitions, partitionFunc)) - - def foreachRDD(self, func): - """ - Apply a function to each RDD in this DStream. - """ - if func.__code__.co_argcount == 1: - old_func = func - func = lambda t, rdd: old_func(rdd) - jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer) - api = self._ssc._jvm.PythonDStream - api.callForeachRDD(self._jdstream, jfunc) - - def pprint(self, num=10): - """ - Print the first num elements of each RDD generated in this DStream. - - @param num: the number of elements from the first will be printed. - """ - def takeAndPrint(time, rdd): - taken = rdd.take(num + 1) - print("-------------------------------------------") - print("Time: %s" % time) - print("-------------------------------------------") - for record in taken[:num]: - print(record) - if len(taken) > num: - print("...") - print("") - - self.foreachRDD(takeAndPrint) - - def mapValues(self, f): - """ - Return a new DStream by applying a map function to the value of - each key-value pairs in this DStream without changing the key. - """ - map_values_fn = lambda kv: (kv[0], f(kv[1])) - return self.map(map_values_fn, preservesPartitioning=True) - - def flatMapValues(self, f): - """ - Return a new DStream by applying a flatmap function to the value - of each key-value pairs in this DStream without changing the key. - """ - flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1])) - return self.flatMap(flat_map_fn, preservesPartitioning=True) - - def glom(self): - """ - Return a new DStream in which RDD is generated by applying glom() - to RDD of this DStream. - """ - def func(iterator): - yield list(iterator) - return self.mapPartitions(func) - - def cache(self): - """ - Persist the RDDs of this DStream with the default storage level - (C{MEMORY_ONLY}). - """ - self.is_cached = True - self.persist(StorageLevel.MEMORY_ONLY) - return self - - def persist(self, storageLevel): - """ - Persist the RDDs of this DStream with the given storage level - """ - self.is_cached = True - javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel) - self._jdstream.persist(javaStorageLevel) - return self - - def checkpoint(self, interval): - """ - Enable periodic checkpointing of RDDs of this DStream - - @param interval: time in seconds, after each period of that, generated - RDD will be checkpointed - """ - self.is_checkpointed = True - self._jdstream.checkpoint(self._ssc._jduration(interval)) - return self - - def groupByKey(self, numPartitions=None): - """ - Return a new DStream by applying groupByKey on each RDD. - """ - if numPartitions is None: - numPartitions = self._sc.defaultParallelism - return self.transform(lambda rdd: rdd.groupByKey(numPartitions)) - - def countByValue(self): - """ - Return a new DStream in which each RDD contains the counts of each - distinct value in each RDD of this DStream. - """ - return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y) - - def saveAsTextFiles(self, prefix, suffix=None): - """ - Save each RDD in this DStream as at text file, using string - representation of elements. - """ - def saveAsTextFile(t, rdd): - path = rddToFileName(prefix, suffix, t) - try: - rdd.saveAsTextFile(path) - except Py4JJavaError as e: - # after recovered from checkpointing, the foreachRDD may - # be called twice - if 'FileAlreadyExistsException' not in str(e): - raise - return self.foreachRDD(saveAsTextFile) - - # TODO: uncomment this until we have ssc.pickleFileStream() - # def saveAsPickleFiles(self, prefix, suffix=None): - # """ - # Save each RDD in this DStream as at binary file, the elements are - # serialized by pickle. - # """ - # def saveAsPickleFile(t, rdd): - # path = rddToFileName(prefix, suffix, t) - # try: - # rdd.saveAsPickleFile(path) - # except Py4JJavaError as e: - # # after recovered from checkpointing, the foreachRDD may - # # be called twice - # if 'FileAlreadyExistsException' not in str(e): - # raise - # return self.foreachRDD(saveAsPickleFile) - - def transform(self, func): - """ - Return a new DStream in which each RDD is generated by applying a function - on each RDD of this DStream. - - `func` can have one argument of `rdd`, or have two arguments of - (`time`, `rdd`) - """ - if func.__code__.co_argcount == 1: - oldfunc = func - func = lambda t, rdd: oldfunc(rdd) - assert func.__code__.co_argcount == 2, "func should take one or two arguments" - return TransformedDStream(self, func) - - def transformWith(self, func, other, keepSerializer=False): - """ - Return a new DStream in which each RDD is generated by applying a function - on each RDD of this DStream and 'other' DStream. - - `func` can have two arguments of (`rdd_a`, `rdd_b`) or have three - arguments of (`time`, `rdd_a`, `rdd_b`) - """ - if func.__code__.co_argcount == 2: - oldfunc = func - func = lambda t, a, b: oldfunc(a, b) - assert func.__code__.co_argcount == 3, "func should take two or three arguments" - jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer, other._jrdd_deserializer) - dstream = self._sc._jvm.PythonTransformed2DStream(self._jdstream.dstream(), - other._jdstream.dstream(), jfunc) - jrdd_serializer = self._jrdd_deserializer if keepSerializer else self._sc.serializer - return DStream(dstream.asJavaDStream(), self._ssc, jrdd_serializer) - - def repartition(self, numPartitions): - """ - Return a new DStream with an increased or decreased level of parallelism. - """ - return self.transform(lambda rdd: rdd.repartition(numPartitions)) - - @property - def _slideDuration(self): - """ - Return the slideDuration in seconds of this DStream - """ - return self._jdstream.dstream().slideDuration().milliseconds() / 1000.0 - - def union(self, other): - """ - Return a new DStream by unifying data of another DStream with this DStream. - - @param other: Another DStream having the same interval (i.e., slideDuration) - as this DStream. - """ - if self._slideDuration != other._slideDuration: - raise ValueError("the two DStream should have same slide duration") - return self.transformWith(lambda a, b: a.union(b), other, True) - - def cogroup(self, other, numPartitions=None): - """ - Return a new DStream by applying 'cogroup' between RDDs of this - DStream and `other` DStream. - - Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - """ - if numPartitions is None: - numPartitions = self._sc.defaultParallelism - return self.transformWith(lambda a, b: a.cogroup(b, numPartitions), other) - - def join(self, other, numPartitions=None): - """ - Return a new DStream by applying 'join' between RDDs of this DStream and - `other` DStream. - - Hash partitioning is used to generate the RDDs with `numPartitions` - partitions. - """ - if numPartitions is None: - numPartitions = self._sc.defaultParallelism - return self.transformWith(lambda a, b: a.join(b, numPartitions), other) - - def leftOuterJoin(self, other, numPartitions=None): - """ - Return a new DStream by applying 'left outer join' between RDDs of this DStream and - `other` DStream. - - Hash partitioning is used to generate the RDDs with `numPartitions` - partitions. - """ - if numPartitions is None: - numPartitions = self._sc.defaultParallelism - return self.transformWith(lambda a, b: a.leftOuterJoin(b, numPartitions), other) - - def rightOuterJoin(self, other, numPartitions=None): - """ - Return a new DStream by applying 'right outer join' between RDDs of this DStream and - `other` DStream. - - Hash partitioning is used to generate the RDDs with `numPartitions` - partitions. - """ - if numPartitions is None: - numPartitions = self._sc.defaultParallelism - return self.transformWith(lambda a, b: a.rightOuterJoin(b, numPartitions), other) - - def fullOuterJoin(self, other, numPartitions=None): - """ - Return a new DStream by applying 'full outer join' between RDDs of this DStream and - `other` DStream. - - Hash partitioning is used to generate the RDDs with `numPartitions` - partitions. - """ - if numPartitions is None: - numPartitions = self._sc.defaultParallelism - return self.transformWith(lambda a, b: a.fullOuterJoin(b, numPartitions), other) - - def _jtime(self, timestamp): - """ Convert datetime or unix_timestamp into Time - """ - if isinstance(timestamp, datetime): - timestamp = time.mktime(timestamp.timetuple()) - return self._sc._jvm.Time(long(timestamp * 1000)) - - def slice(self, begin, end): - """ - Return all the RDDs between 'begin' to 'end' (both included) - - `begin`, `end` could be datetime.datetime() or unix_timestamp - """ - jrdds = self._jdstream.slice(self._jtime(begin), self._jtime(end)) - return [RDD(jrdd, self._sc, self._jrdd_deserializer) for jrdd in jrdds] - - def _validate_window_param(self, window, slide): - duration = self._jdstream.dstream().slideDuration().milliseconds() - if int(window * 1000) % duration != 0: - raise ValueError("windowDuration must be multiple of the slide duration (%d ms)" - % duration) - if slide and int(slide * 1000) % duration != 0: - raise ValueError("slideDuration must be multiple of the slide duration (%d ms)" - % duration) - - def window(self, windowDuration, slideDuration=None): - """ - Return a new DStream in which each RDD contains all the elements in seen in a - sliding window of time over this DStream. - - @param windowDuration: width of the window; must be a multiple of this DStream's - batching interval - @param slideDuration: sliding interval of the window (i.e., the interval after which - the new DStream will generate RDDs); must be a multiple of this - DStream's batching interval - """ - self._validate_window_param(windowDuration, slideDuration) - d = self._ssc._jduration(windowDuration) - if slideDuration is None: - return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer) - s = self._ssc._jduration(slideDuration) - return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer) - - def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration): - """ - Return a new DStream in which each RDD has a single element generated by reducing all - elements in a sliding window over this DStream. - - if `invReduceFunc` is not None, the reduction is done incrementally - using the old window's reduced value : - - 1. reduce the new values that entered the window (e.g., adding new counts) - - 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - This is more efficient than `invReduceFunc` is None. - - @param reduceFunc: associative and commutative reduce function - @param invReduceFunc: inverse reduce function of `reduceFunc` - @param windowDuration: width of the window; must be a multiple of this DStream's - batching interval - @param slideDuration: sliding interval of the window (i.e., the interval after which - the new DStream will generate RDDs); must be a multiple of this - DStream's batching interval - """ - keyed = self.map(lambda x: (1, x)) - reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc, - windowDuration, slideDuration, 1) - return reduced.map(lambda kv: kv[1]) - - def countByWindow(self, windowDuration, slideDuration): - """ - Return a new DStream in which each RDD has a single element generated - by counting the number of elements in a window over this DStream. - windowDuration and slideDuration are as defined in the window() operation. - - This is equivalent to window(windowDuration, slideDuration).count(), - but will be more efficient if window is large. - """ - return self.map(lambda x: 1).reduceByWindow(operator.add, operator.sub, - windowDuration, slideDuration) - - def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None): - """ - Return a new DStream in which each RDD contains the count of distinct elements in - RDDs in a sliding window over this DStream. - - @param windowDuration: width of the window; must be a multiple of this DStream's - batching interval - @param slideDuration: sliding interval of the window (i.e., the interval after which - the new DStream will generate RDDs); must be a multiple of this - DStream's batching interval - @param numPartitions: number of partitions of each RDD in the new DStream. - """ - keyed = self.map(lambda x: (x, 1)) - counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub, - windowDuration, slideDuration, numPartitions) - return counted.filter(lambda kv: kv[1] > 0) - - def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None): - """ - Return a new DStream by applying `groupByKey` over a sliding window. - Similar to `DStream.groupByKey()`, but applies it over a sliding window. - - @param windowDuration: width of the window; must be a multiple of this DStream's - batching interval - @param slideDuration: sliding interval of the window (i.e., the interval after which - the new DStream will generate RDDs); must be a multiple of this - DStream's batching interval - @param numPartitions: Number of partitions of each RDD in the new DStream. - """ - ls = self.mapValues(lambda x: [x]) - grouped = ls.reduceByKeyAndWindow(lambda a, b: a.extend(b) or a, lambda a, b: a[len(b):], - windowDuration, slideDuration, numPartitions) - return grouped.mapValues(ResultIterable) - - def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None, - numPartitions=None, filterFunc=None): - """ - Return a new DStream by applying incremental `reduceByKey` over a sliding window. - - The reduced value of over a new window is calculated using the old window's reduce value : - 1. reduce the new values that entered the window (e.g., adding new counts) - 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - - `invFunc` can be None, then it will reduce all the RDDs in window, could be slower - than having `invFunc`. - - @param func: associative and commutative reduce function - @param invFunc: inverse function of `reduceFunc` - @param windowDuration: width of the window; must be a multiple of this DStream's - batching interval - @param slideDuration: sliding interval of the window (i.e., the interval after which - the new DStream will generate RDDs); must be a multiple of this - DStream's batching interval - @param numPartitions: number of partitions of each RDD in the new DStream. - @param filterFunc: function to filter expired key-value pairs; - only pairs that satisfy the function are retained - set this to null if you do not want to filter - """ - self._validate_window_param(windowDuration, slideDuration) - if numPartitions is None: - numPartitions = self._sc.defaultParallelism - - reduced = self.reduceByKey(func, numPartitions) - - if invFunc: - def reduceFunc(t, a, b): - b = b.reduceByKey(func, numPartitions) - r = a.union(b).reduceByKey(func, numPartitions) if a else b - if filterFunc: - r = r.filter(filterFunc) - return r - - def invReduceFunc(t, a, b): - b = b.reduceByKey(func, numPartitions) - joined = a.leftOuterJoin(b, numPartitions) - return joined.mapValues(lambda kv: invFunc(kv[0], kv[1]) - if kv[1] is not None else kv[0]) - - jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer) - jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer) - if slideDuration is None: - slideDuration = self._slideDuration - dstream = self._sc._jvm.PythonReducedWindowedDStream( - reduced._jdstream.dstream(), - jreduceFunc, jinvReduceFunc, - self._ssc._jduration(windowDuration), - self._ssc._jduration(slideDuration)) - return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) - else: - return reduced.window(windowDuration, slideDuration).reduceByKey(func, numPartitions) - - def updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None): - """ - Return a new "state" DStream where the state for each key is updated by applying - the given function on the previous state of the key and the new values of the key. - - @param updateFunc: State update function. If this function returns None, then - corresponding state key-value pair will be eliminated. - """ - if numPartitions is None: - numPartitions = self._sc.defaultParallelism - - if initialRDD and not isinstance(initialRDD, RDD): - initialRDD = self._sc.parallelize(initialRDD) - - def reduceFunc(t, a, b): - if a is None: - g = b.groupByKey(numPartitions).mapValues(lambda vs: (list(vs), None)) - else: - g = a.cogroup(b.partitionBy(numPartitions), numPartitions) - g = g.mapValues(lambda ab: (list(ab[1]), list(ab[0])[0] if len(ab[0]) else None)) - state = g.mapValues(lambda vs_s: updateFunc(vs_s[0], vs_s[1])) - return state.filter(lambda k_v: k_v[1] is not None) - - jreduceFunc = TransformFunction(self._sc, reduceFunc, - self._sc.serializer, self._jrdd_deserializer) - if initialRDD: - initialRDD = initialRDD._reserialize(self._jrdd_deserializer) - dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc, - initialRDD._jrdd) - else: - dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc) - - return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) - - -class TransformedDStream(DStream): - """ - TransformedDStream is an DStream generated by an Python function - transforming each RDD of an DStream to another RDDs. - - Multiple continuous transformations of DStream can be combined into - one transformation. - """ - def __init__(self, prev, func): - self._ssc = prev._ssc - self._sc = self._ssc._sc - self._jrdd_deserializer = self._sc.serializer - self.is_cached = False - self.is_checkpointed = False - self._jdstream_val = None - - # Using type() to avoid folding the functions and compacting the DStreams which is not - # not strictly a object of TransformedDStream. - # Changed here is to avoid bug in KafkaTransformedDStream when calling offsetRanges(). - if (type(prev) is TransformedDStream and - not prev.is_cached and not prev.is_checkpointed): - prev_func = prev.func - self.func = lambda t, rdd: func(t, prev_func(t, rdd)) - self.prev = prev.prev - else: - self.prev = prev - self.func = func - - @property - def _jdstream(self): - if self._jdstream_val is not None: - return self._jdstream_val - - jfunc = TransformFunction(self._sc, self.func, self.prev._jrdd_deserializer) - dstream = self._sc._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), jfunc) - self._jdstream_val = dstream.asJavaDStream() - return self._jdstream_val http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/python/mqtt.py ---------------------------------------------------------------------- diff --git a/streaming-mqtt/python/mqtt.py b/streaming-mqtt/python/mqtt.py index 8848a70..c55b704 100644 --- a/streaming-mqtt/python/mqtt.py +++ b/streaming-mqtt/python/mqtt.py @@ -38,19 +38,26 @@ class MQTTUtils(object): :param storageLevel: RDD storage level. :return: A DStream object """ + jlevel = ssc._sc._getJavaStorageLevel(storageLevel) + helper = MQTTUtils._get_helper(ssc._sc) + jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel) + return DStream(jstream, ssc, UTF8Deserializer()) + + @staticmethod + def _get_helper(sc): try: - helper = ssc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper() + return sc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper() except TypeError as e: if str(e) == "'JavaPackage' object is not callable": - MQTTUtils._printErrorMsg(ssc.sparkContext) + MQTTUtils._printErrorMsg(sc) raise - jlevel = ssc._sc._getJavaStorageLevel(storageLevel) - jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel) - return DStream(jstream, ssc, UTF8Deserializer()) - @staticmethod def _printErrorMsg(sc): + scalaVersionString = sc._jvm.scala.util.Properties.versionString() + import re + scalaVersion = re.sub(r'version (\d+\.\d+)\.\d+', r'\1', scalaVersionString) + sparkVersion = re.sub(r'(\d+\.\d+\.\d+).*', r'\1', sc.version) print(""" ________________________________________________________________________________________________ @@ -59,12 +66,12 @@ ________________________________________________________________________________ 1. Include the MQTT library and its dependencies with in the spark-submit command as - $ bin/spark-submit --packages org.apache.spark:spark-streaming-mqtt:%s ... + ${SPARK_HOME}/bin/spark-submit --packages org.apache.bahir:spark-streaming-mqtt_%s:%s ... 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, - Group Id = org.apache.spark, Artifact Id = spark-streaming-mqtt-assembly, Version = %s. + Group Id = org.apache.bahir, Artifact Id = spark-streaming-mqtt, Version = %s. Then, include the jar in the spark-submit command as - $ bin/spark-submit --jars ... + ${SPARK_HOME}/bin/spark-submit --jars ... ________________________________________________________________________________________________ -""" % (sc.version, sc.version)) +""" % (scalaVersion, sparkVersion, sparkVersion)) http://git-wip-us.apache.org/repos/asf/bahir/blob/12f13084/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala index 80ab27b..3ce7511 100644 --- a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala +++ b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets import scala.language.postfixOps import org.apache.activemq.broker.{BrokerService, TransportConnector} +import org.apache.activemq.usage.SystemUsage import org.apache.commons.lang3.RandomUtils import org.eclipse.paho.client.mqttv3._ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence @@ -41,6 +42,7 @@ private[mqtt] class MQTTTestUtils extends Logging { private val brokerPort = findFreePort() private var broker: BrokerService = _ + private var systemUsage: SystemUsage = _ private var connector: TransportConnector = _ def brokerUri: String = { @@ -50,6 +52,10 @@ private[mqtt] class MQTTTestUtils extends Logging { def setup(): Unit = { broker = new BrokerService() broker.setDataDirectoryFile(Utils.createTempDir()) + broker.getSystemUsage().setSendFailIfNoSpace(false) + systemUsage = broker.getSystemUsage() + systemUsage.getStoreUsage().setLimit(1024L * 1024 * 256); // 256 MB (default: 100 GB) + systemUsage.getTempUsage().setLimit(1024L * 1024 * 128); // 128 MB (default: 50 GB) connector = new TransportConnector() connector.setName("mqtt") connector.setUri(new URI("mqtt://" + brokerUri))