Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A7109175EF for ; Tue, 22 Sep 2015 14:58:44 +0000 (UTC) Received: (qmail 78651 invoked by uid 500); 22 Sep 2015 14:58:44 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 78549 invoked by uid 500); 22 Sep 2015 14:58:44 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 78503 invoked by uid 99); 22 Sep 2015 14:58:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Sep 2015 14:58:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 54A6CE0582; Tue, 22 Sep 2015 14:58:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Tue, 22 Sep 2015 14:58:47 -0000 Message-Id: <30833538be114fc1bb0bca756289f555@git.apache.org> In-Reply-To: <60f08afac2474cc8ac2760ba5902c0b6@git.apache.org> References: <60f08afac2474cc8ac2760ba5902c0b6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/22] ignite git commit: IGNITE-535 (WIP) Implement MQTT Streamer. IGNITE-535 (WIP) Implement MQTT Streamer. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6b53f1bb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6b53f1bb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6b53f1bb Branch: refs/heads/ignite-1282 Commit: 6b53f1bb2699ae7a6ea8ae277de1bfc3ecbd7b6a Parents: b80b171 Author: Raul Kripalani Authored: Tue Sep 15 00:45:58 2015 +0100 Committer: Raul Kripalani Committed: Tue Sep 15 22:30:09 2015 +0100 ---------------------------------------------------------------------- modules/mqtt/pom.xml | 110 +++++++++ .../apache/ignite/stream/mqtt/MqttStreamer.java | 243 +++++++++++++++++++ .../stream/mqtt/IgniteMqttStreamerTest.java | 50 ++++ .../mqtt/IgniteMqttStreamerTestSuite.java | 34 +++ .../ignite/stream/mqtt/TestTupleExtractors.java | 28 +++ pom.xml | 1 + 6 files changed, 466 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/pom.xml ---------------------------------------------------------------------- diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml new file mode 100644 index 0000000..b108180 --- /dev/null +++ b/modules/mqtt/pom.xml @@ -0,0 +1,110 @@ + + + + + + + 4.0.0 + + + org.apache.ignite + ignite-parent + 1 + ../../parent + + + ignite-mqtt + 1.5.0-SNAPSHOT + http://ignite.apache.org + + + 1.0.2 + 0.7 + + + + + org.apache.ignite + ignite-core + ${project.version} + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + ${paho.version} + + + + org.eclipse.moquette + moquette-broker + ${mosquette.version} + test + + + + org.apache.ignite + ignite-log4j + ${project.version} + test + + + + org.apache.ignite + ignite-spring + ${project.version} + test + + + + org.apache.ignite + ignite-core + ${project.version} + test-jar + test + + + + + + + bintray + http://dl.bintray.com/andsel/maven/ + + true + + + false + + + + Eclipse Paho Repo + https://repo.eclipse.org/content/repositories/paho-releases/ + + true + + + false + + + + + http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java new file mode 100644 index 0000000..00a89ab --- /dev/null +++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.mqtt; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.stream.StreamAdapter; +import org.apache.ignite.stream.StreamMultipleTupleExtractor; +import org.apache.ignite.stream.StreamSingleTupleExtractor; + +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttClientPersistence; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +/** + * Streamer that consumes from a MQTT topic and feeds key-value pairs into an {@link IgniteDataStreamer} instance, + * using Eclipse Paho as an MQTT client. + *

+ * You must also provide a {@link StreamSingleTupleExtractor} or a {@link StreamMultipleTupleExtractor} to extract + * cache tuples out of the incoming message. + *

+ * This Streamer has many features: + * + *

    + *
  • Subscribing to a single topic or multiple topics at once.
  • + *
  • Specifying the subscriber's QoS for a single topic or for multiple topics.
  • + *
  • Allows setting {@link MqttConnectOptions} to support features like last will testament, persistent + * sessions, etc.
  • + *
  • Specifying the client ID.
  • + *
+ * + * Note: features like durable subscriptions, last will testament, etc. must be configured via the + * {@link #connectOptions} property. + * + * @author Raul Kripalani + */ +public class MqttStreamer extends StreamAdapter implements MqttCallback { + + /** Logger. */ + private IgniteLogger log; + + private MqttClient client; + + private String brokerUrl; + + private String topic; + + private Integer qualityOfService; + + private List topics; + + private List qualitiesOfService; + + /** Client ID in case we're using durable subscribers. */ + private String clientId; + + private MqttClientPersistence persistence; + + private MqttConnectOptions connectOptions; + + // disconnect parameters + private Integer disconnectQuiesceTimeout; + + private boolean disconnectForcibly; + + private Integer disconnectForciblyTimeout; + + private volatile boolean stopped = true; + + /** + * Starts streamer. + * + * @throws IgniteException If failed. + */ + public void start() throws IgniteException { + if (!stopped) + throw new IgniteException("Attempted to start an already started MQTT Streamer"); + + // for simplicity, if these are null initialize to empty lists + topics = topics == null ? new ArrayList() : topics; + qualitiesOfService = qualitiesOfService == null ? new ArrayList() : qualitiesOfService; + + try { + // parameter validations + A.notNull(getStreamer(), "streamer"); + A.notNull(getIgnite(), "ignite"); + A.ensure(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null, "tuple extractor missing"); + A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " + + "both single and multiple tuple extractor"); + A.notNullOrEmpty(brokerUrl, "broker URL"); + A.notNullOrEmpty(clientId, "client ID"); + + // if we have both a single topic and a list of topics, fail + if (topic != null && topic.length() > 0 && !topics.isEmpty()) + throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time"); + + // if we have both a single QoS and list, fail + if (qualityOfService != null && !qualitiesOfService.isEmpty()) { + throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time"); + } + + // Paho API requires disconnect timeout if providing a quiesce timeout and disconnecting forcibly + if (disconnectForcibly && disconnectQuiesceTimeout != null) { + A.notNull(disconnectForciblyTimeout, "disconnect timeout cannot be null when disconnecting forcibly " + + "with quiesce"); + } + + // if we have multiple topics + if (topics != null && !topics.isEmpty()) { + for (String t : topics) { + A.notNullOrEmpty(t, "topic in list of topics"); + } + A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), "qualities of " + + "service must be either empty or have the same size as topics list"); + } + + // create logger + log = getIgnite().log(); + + // create the mqtt client + if (persistence == null) + client = new MqttClient(brokerUrl, clientId); + else + client = new MqttClient(brokerUrl, clientId, persistence); + + connectAndSubscribe(); + + stopped = false; + + } + catch (Throwable t) { + throw new IgniteException("Exception while initializing MqttStreamer", t); + } + + } + + private void connectAndSubscribe() throws MqttException { + // connect + if (connectOptions != null) + client.connect(); + else + client.connect(connectOptions); + + // subscribe to multiple topics + if (!topics.isEmpty()) { + if (qualitiesOfService.isEmpty()) { + client.subscribe(topics.toArray(new String[0])); + } else { + int[] qoses = new int[qualitiesOfService.size()]; + for (int i = 0; i < qualitiesOfService.size(); i++) + qoses[i] = qualitiesOfService.get(i); + + client.subscribe(topics.toArray(new String[0]), qoses); + } + } else { + // subscribe to a single topic + if (qualityOfService == null) { + client.subscribe(topic); + } else { + client.subscribe(topic, qualityOfService); + } + } + } + + /** + * Stops streamer. + */ + public void stop() throws IgniteException { + if (stopped) + throw new IgniteException("Attempted to stop an already stopped MQTT Streamer"); + + try { + if (disconnectForcibly) { + if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null) { + client.disconnectForcibly(); + } else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null) { + client.disconnectForcibly(disconnectForciblyTimeout); + } else { + client.disconnectForcibly(disconnectQuiesceTimeout, disconnectForciblyTimeout); + } + } else { + if (disconnectQuiesceTimeout == null) { + client.disconnect(); + } else { + client.disconnect(disconnectQuiesceTimeout); + } + } + } + catch (Throwable t) { + throw new IgniteException("Exception while stopping MqttStreamer", t); + } + } + + @Override public void connectionLost(Throwable throwable) { + log.warning(String.format("MQTT Connection to server %s was lost due to", brokerUrl), throwable); + // TODO: handle reconnect attempts with an optional backoff mechanism (linear, exponential, finonacci) + try { + connectAndSubscribe(); + } + catch (MqttException e) { + e.printStackTrace(); + } + } + + @Override public void messageArrived(String topic, MqttMessage message) throws Exception { + if (getMultipleTupleExtractor() != null) { + Map entries = getMultipleTupleExtractor().extract(message); + getStreamer().addData(entries); + } else { + Map.Entry entry = getSingleTupleExtractor().extract(message); + getStreamer().addData(entry); + } + } + + @Override public void deliveryComplete(IMqttDeliveryToken token) { + // ignore, we don't send messages + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java new file mode 100644 index 0000000..59730fa --- /dev/null +++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.mqtt; + +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import org.junit.After; +import org.junit.Before; + +/** + * Test for {@link MqttStreamer}. + * + * @author Raul Kripalani + */ +public class IgniteMqttStreamerTest extends GridCommonAbstractTest { + + /** Constructor. */ + public IgniteMqttStreamerTest() { + super(true); + } + + @Before @SuppressWarnings("unchecked") + public void beforeTest() throws Exception { + grid().getOrCreateCache(defaultCacheConfiguration()); + + } + + @After + public void afterTest() throws Exception { + grid().cache(null).clear(); + + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java new file mode 100644 index 0000000..ff25145 --- /dev/null +++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.mqtt; + +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +/** + * MQTT streamer tests. + * + * @author Raul Kripalani + */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + IgniteMqttStreamerTest.class +}) +public class IgniteMqttStreamerTestSuite { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java new file mode 100644 index 0000000..e2ed0f0 --- /dev/null +++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.mqtt; + +/** + * Test transformers for MqttStreamer tests. + * + * @author Raul Kripalani + */ +public class TestTupleExtractors { + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b47958f..c19a9b7 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,7 @@ modules/kafka modules/yarn modules/jms11 + modules/mqtt modules/zookeeper modules/platform