Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9408C200B6A for ; Mon, 22 Aug 2016 21:39:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 92900160A87; Mon, 22 Aug 2016 19:39:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 88868160AB3 for ; Mon, 22 Aug 2016 21:39:50 +0200 (CEST) Received: (qmail 29153 invoked by uid 500); 22 Aug 2016 19:39:49 -0000 Mailing-List: contact commits-help@bahir.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@bahir.apache.org Delivered-To: mailing list commits@bahir.apache.org Received: (qmail 29137 invoked by uid 99); 22 Aug 2016 19:39:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Aug 2016 19:39:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9C71FE0243; Mon, 22 Aug 2016 19:39:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lresende@apache.org To: commits@bahir.apache.org Date: Mon, 22 Aug 2016 19:39:50 -0000 Message-Id: <970aa734a9db476db12fa9453435f55c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] bahir git commit: [BAHIR-53] Add new configuration options to MQTTInputDStream archived-at: Mon, 22 Aug 2016 19:39:51 -0000 [BAHIR-53] Add new configuration options to MQTTInputDStream Add new configuration options to enable secured connections and other quality of services. Closes #23 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/28f034f4 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/28f034f4 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/28f034f4 Branch: refs/heads/master Commit: 28f034f49d19034b596f7f04ca4fc2698a21ad6c Parents: ab62371 Author: Sebastian Woehrl Authored: Sat Aug 13 15:00:13 2016 +0200 Committer: Luciano Resende Committed: Mon Aug 22 12:38:28 2016 -0700 ---------------------------------------------------------------------- scalastyle-config.xml | 2 +- streaming-mqtt/README.md | 23 ++++ .../spark/streaming/mqtt/MQTTInputDStream.scala | 67 ++++++++-- .../apache/spark/streaming/mqtt/MQTTUtils.scala | 129 ++++++++++++++++++- .../streaming/mqtt/JavaMQTTStreamSuite.java | 6 + 5 files changed, 211 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/28f034f4/scalastyle-config.xml ---------------------------------------------------------------------- diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 1db5977..c6aa3d9 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -94,7 +94,7 @@ This file is divided into 3 sections: - + http://git-wip-us.apache.org/repos/asf/bahir/blob/28f034f4/streaming-mqtt/README.md ---------------------------------------------------------------------- diff --git a/streaming-mqtt/README.md b/streaming-mqtt/README.md index 9482648..2ec0128 100644 --- a/streaming-mqtt/README.md +++ b/streaming-mqtt/README.md @@ -25,6 +25,23 @@ The `--packages` argument can also be used with `bin/spark-submit`. This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above. +## Configuration options. + +This source uses the [Eclipse Paho Java Client](https://eclipse.org/paho/clients/java/). Client API documentation is located [here](http://www.eclipse.org/paho/files/javadoc/index.html). + + * `brokerUrl` A url MqttClient connects to. Set this as the url of the Mqtt Server. e.g. tcp://localhost:1883. + * `storageLevel` By default it is used for storing incoming messages on disk. + * `topic` Topic MqttClient subscribes to. + * `clientId` clientId, this client is assoicated with. Provide the same value to recover a stopped client. + * `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe. + * `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors. + * `password` Sets the password to use for the connection. + * `cleanSession` Setting it true starts a clean session, removes all checkpointed messages by a previous run of this source. This is set to false by default. + * `connectionTimeout` Sets the connection timeout, a value of 0 is interpreted as wait until client connects. See `MqttConnectOptions.setConnectionTimeout` for more information. + * `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`. + * `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`. + + ## Examples ### Scala API @@ -34,6 +51,12 @@ this actor can be configured to handle failures, etc. val lines = MQTTUtils.createStream(ssc, brokerUrl, topic) +Additional mqtt connection options can be provided: + +```Scala +val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion) +``` + ### Java API You need to extend `JavaActorReceiver` so as to store received data into Spark using `store(...)` methods. The supervisor strategy of http://git-wip-us.apache.org/repos/asf/bahir/blob/28f034f4/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index cbad6f7..328656b 100644 --- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -19,10 +19,7 @@ package org.apache.spark.streaming.mqtt import java.nio.charset.StandardCharsets -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken -import org.eclipse.paho.client.mqttv3.MqttCallback -import org.eclipse.paho.client.mqttv3.MqttClient -import org.eclipse.paho.client.mqttv3.MqttMessage +import org.eclipse.paho.client.mqttv3._ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence import org.apache.spark.storage.StorageLevel @@ -33,23 +30,39 @@ import org.apache.spark.streaming.receiver.Receiver /** * Input stream that subscribe messages from a Mqtt Broker. * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ - * @param brokerUrl Url of remote mqtt publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. + * @param brokerUrl Url of remote mqtt publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + * @param qos Quality of service to use for the topic subscription + * @param connectionTimeout Connection timeout for the mqtt connection + * @param keepAliveInterval Keepalive interal for the mqtt connection + * @param mqttVersion Version to use for the mqtt connection */ - private[streaming] class MQTTInputDStream( _ssc: StreamingContext, brokerUrl: String, topic: String, - storageLevel: StorageLevel + storageLevel: StorageLevel, + clientId: Option[String] = None, + username: Option[String] = None, + password: Option[String] = None, + cleanSession: Option[Boolean] = None, + qos: Option[Int] = None, + connectionTimeout: Option[Int] = None, + keepAliveInterval: Option[Int] = None, + mqttVersion: Option[Int] = None ) extends ReceiverInputDStream[String](_ssc) { private[streaming] override def name: String = s"MQTT stream [$id]" def getReceiver(): Receiver[String] = { - new MQTTReceiver(brokerUrl, topic, storageLevel) + new MQTTReceiver(brokerUrl, topic, storageLevel, clientId, username, password, cleanSession, + qos, connectionTimeout, keepAliveInterval, mqttVersion) } } @@ -57,7 +70,15 @@ private[streaming] class MQTTReceiver( brokerUrl: String, topic: String, - storageLevel: StorageLevel + storageLevel: StorageLevel, + clientId: Option[String], + username: Option[String], + password: Option[String], + cleanSession: Option[Boolean], + qos: Option[Int], + connectionTimeout: Option[Int], + keepAliveInterval: Option[Int], + mqttVersion: Option[Int] ) extends Receiver[String](storageLevel) { def onStop() { @@ -70,7 +91,25 @@ class MQTTReceiver( val persistence = new MemoryPersistence() // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance - val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence) + val client = new MqttClient(brokerUrl, clientId.getOrElse(MqttClient.generateClientId()), + persistence) + + // Initialize mqtt parameters + val mqttConnectionOptions = new MqttConnectOptions() + if (username.isDefined && password.isDefined) { + mqttConnectionOptions.setUserName(username.get) + mqttConnectionOptions.setPassword(password.get.toCharArray) + } + mqttConnectionOptions.setCleanSession(cleanSession.getOrElse(true)) + if (connectionTimeout.isDefined) { + mqttConnectionOptions.setConnectionTimeout(connectionTimeout.get) + } + if (keepAliveInterval.isDefined) { + mqttConnectionOptions.setKeepAliveInterval(keepAliveInterval.get) + } + if (mqttVersion.isDefined) { + mqttConnectionOptions.setMqttVersion(mqttVersion.get) + } // Callback automatically triggers as and when new message arrives on specified topic val callback = new MqttCallback() { @@ -93,10 +132,10 @@ class MQTTReceiver( client.setCallback(callback) // Connect to MqttBroker - client.connect() + client.connect(mqttConnectionOptions) // Subscribe to Mqtt topic - client.subscribe(topic) + client.subscribe(topic, qos.getOrElse(1)) } } http://git-wip-us.apache.org/repos/asf/bahir/blob/28f034f4/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 7b8d56d..7e2f5c7 100644 --- a/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -41,6 +41,40 @@ object MQTTUtils { new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel) } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param ssc StreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topic Topic name to subscribe to + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + * @param qos Quality of service to use for the topic subscription + * @param connectionTimeout Connection timeout for the mqtt connection + * @param keepAliveInterval Keepalive interal for the mqtt connection + * @param mqttVersion Version to use for the mqtt connection + */ + def createStream( + ssc: StreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel, + clientId: Option[String], + username: Option[String], + password: Option[String], + cleanSession: Option[Boolean], + qos: Option[Int], + connectionTimeout: Option[Int], + keepAliveInterval: Option[Int], + mqttVersion: Option[Int] + ): ReceiverInputDStream[String] = { + new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password, + cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion) + } + /** * Create an input stream that receives messages pushed by a MQTT publisher. * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. @@ -59,7 +93,7 @@ object MQTTUtils { /** * Create an input stream that receives messages pushed by a MQTT publisher. - * @param jssc JavaStreamingContext object + * @param jssc JavaStreamingContext object * @param brokerUrl Url of remote MQTT publisher * @param topic Topic name to subscribe to * @param storageLevel RDD storage level. @@ -73,6 +107,99 @@ object MQTTUtils { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] createStream(jssc.ssc, brokerUrl, topic, storageLevel) } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topic Topic name to subscribe to + * @param storageLevel RDD storage level. + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + * @param qos Quality of service to use for the topic subscription + * @param connectionTimeout Connection timeout for the mqtt connection + * @param keepAliveInterval Keepalive interal for the mqtt connection + * @param mqttVersion Version to use for the mqtt connection + */ + def createStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel, + clientId: String, + username: String, + password: String, + cleanSession: Boolean, + qos: Int, + connectionTimeout: Int, + keepAliveInterval: Int, + mqttVersion: Int + ): JavaReceiverInputDStream[String] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createStream(jssc.ssc, brokerUrl, topic, storageLevel, Option(clientId), Option(username), + Option(password), Option(cleanSession), Option(qos), Option(connectionTimeout), + Option(keepAliveInterval), Option(mqttVersion)) + } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topic Topic name to subscribe to + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + * @param qos Quality of service to use for the topic subscription + * @param connectionTimeout Connection timeout for the mqtt connection + * @param keepAliveInterval Keepalive interal for the mqtt connection + * @param mqttVersion Version to use for the mqtt connection + */ + def createStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topic: String, + clientId: String, + username: String, + password: String, + cleanSession: Boolean, + qos: Int, + connectionTimeout: Int, + keepAliveInterval: Int, + mqttVersion: Int + ): JavaReceiverInputDStream[String] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createStream(jssc.ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2, Option(clientId), + Option(username), Option(password), Option(cleanSession), Option(qos), + Option(connectionTimeout), Option(keepAliveInterval), Option(mqttVersion)) + } + + /** + * Create an input stream that receives messages pushed by a MQTT publisher. + * @param jssc JavaStreamingContext object + * @param brokerUrl Url of remote MQTT publisher + * @param topic Topic name to subscribe to + * @param clientId ClientId to use for the mqtt connection + * @param username Username for authentication to the mqtt publisher + * @param password Password for authentication to the mqtt publisher + * @param cleanSession Sets the mqtt cleanSession parameter + */ + def createStream( + jssc: JavaStreamingContext, + brokerUrl: String, + topic: String, + clientId: String, + username: String, + password: String, + cleanSession: Boolean + ): JavaReceiverInputDStream[String] = { + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + createStream(jssc.ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2, Option(clientId), + Option(username), Option(password), Option(cleanSession), None, None, None, None) + } + } /** http://git-wip-us.apache.org/repos/asf/bahir/blob/28f034f4/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java ---------------------------------------------------------------------- diff --git a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java index ce5aa1e..45332d9 100644 --- a/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java +++ b/streaming-mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java @@ -33,5 +33,11 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext { JavaReceiverInputDStream test1 = MQTTUtils.createStream(ssc, brokerUrl, topic); JavaReceiverInputDStream test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream test3 = MQTTUtils.createStream(ssc, brokerUrl, topic, + StorageLevel.MEMORY_AND_DISK_SER_2(), "testid", "user", "password", true, 1, 10, 30, 3); + JavaReceiverInputDStream test4 = MQTTUtils.createStream(ssc, brokerUrl, topic, + "testid", "user", "password", true, 1, 10, 30, 3); + JavaReceiverInputDStream test5 = MQTTUtils.createStream(ssc, brokerUrl, topic, + "testid", "user", "password", true); } }