Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D125F18B09 for ; Wed, 28 Oct 2015 10:18:22 +0000 (UTC) Received: (qmail 35346 invoked by uid 500); 28 Oct 2015 10:18:07 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 35248 invoked by uid 500); 28 Oct 2015 10:18:06 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 35208 invoked by uid 99); 28 Oct 2015 10:18:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Oct 2015 10:18:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BDB66DFE34; Wed, 28 Oct 2015 10:18:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Wed, 28 Oct 2015 10:18:06 -0000 Message-Id: <7ca55ae816bf42909a18f773fab40a63@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/33] ignite git commit: IGNITE-1747 Fix code style and Javadoc issues in MQTT streamer. Repository: ignite Updated Branches: refs/heads/ignite-1093-3 b4cf8eba0 -> b46015cbf IGNITE-1747 Fix code style and Javadoc issues in MQTT streamer. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb0d432f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb0d432f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb0d432f Branch: refs/heads/ignite-1093-3 Commit: cb0d432f358615b12a65eb62f4b03b70680b3575 Parents: 43b40f4 Author: Raul Kripalani Authored: Tue Oct 20 18:36:52 2015 +0100 Committer: Raul Kripalani Committed: Tue Oct 20 18:37:20 2015 +0100 ---------------------------------------------------------------------- .../apache/ignite/stream/mqtt/MqttStreamer.java | 252 +++++++++++++------ .../stream/mqtt/IgniteMqttStreamerTest.java | 35 ++- .../mqtt/IgniteMqttStreamerTestSuite.java | 2 +- 3 files changed, 206 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d432f/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java index f18ae42..39d8d6e 100644 --- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java +++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java @@ -18,6 +18,7 @@ package org.apache.ignite.stream.mqtt; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -70,10 +71,10 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; * * * Note: features like durable subscriptions, last will testament, etc. can be configured via the - * {@link #connectOptions} property. + * {@link #setConnectOptions(MqttConnectOptions)} setter. * * @see guava-retrying library - * @author Raul Kripalani + * */ public class MqttStreamer extends StreamAdapter implements MqttCallback { @@ -137,7 +138,7 @@ public class MqttStreamer extends StreamAdapter impleme private volatile boolean connected; /** Cached log prefix for cache messages. */ - private String cachedLogPrefix; + private String cachedLogValues; /** * Starts streamer. @@ -150,9 +151,12 @@ public class MqttStreamer extends StreamAdapter impleme // for simplicity, if these are null initialize to empty lists topics = topics == null ? new ArrayList() : topics; + qualitiesOfService = qualitiesOfService == null ? new ArrayList() : qualitiesOfService; try { + Map logValues = new HashMap<>(); + // parameter validations A.notNull(getStreamer(), "streamer"); A.notNull(getIgnite(), "ignite"); @@ -162,9 +166,8 @@ public class MqttStreamer extends StreamAdapter impleme A.notNullOrEmpty(brokerUrl, "broker URL"); // if the client ID is empty, generate one - if (clientId == null || clientId.length() == 0) { + if (clientId == null || clientId.length() == 0) clientId = MqttClient.generateClientId(); - } // if we have both a single topic and a list of topics (but the list of topic is not of // size 1 and == topic, as this would be a case of re-initialization), fail @@ -174,15 +177,13 @@ public class MqttStreamer extends StreamAdapter impleme // same as above but for QoS if (qualityOfService != null && !qualitiesOfService.isEmpty() && qualitiesOfService.size() != 1 && - !qualitiesOfService.get(0).equals(qualityOfService)) { + !qualitiesOfService.get(0).equals(qualityOfService)) throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time"); - } // Paho API requires disconnect timeout if providing a quiesce timeout and disconnecting forcibly - if (disconnectForcibly && disconnectQuiesceTimeout != null) { + if (disconnectForcibly && disconnectQuiesceTimeout != null) A.notNull(disconnectForciblyTimeout, "disconnect timeout cannot be null when disconnecting forcibly " + "with quiesce"); - } // if we have multiple topics if (!topics.isEmpty()) { @@ -192,7 +193,7 @@ public class MqttStreamer extends StreamAdapter impleme A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), "qualities of " + "service must be either empty or have the same size as topics list"); - cachedLogPrefix = "[" + Joiner.on(",").join(topics) + "]"; + logValues.put("topics", topics); } else { // just the single topic topics.add(topic); @@ -200,9 +201,16 @@ public class MqttStreamer extends StreamAdapter impleme if (qualityOfService != null) qualitiesOfService.add(qualityOfService); - cachedLogPrefix = "[" + topic + "]"; + logValues.put("topic", topic); } + // finish building log values + logValues.put("brokerUrl", brokerUrl); + logValues.put("clientId", clientId); + + // cache log values + cachedLogValues = "[" + Joiner.on(", ").withKeyValueSeparator("=").join(logValues) + "]"; + // create logger log = getIgnite().log(); @@ -232,6 +240,10 @@ public class MqttStreamer extends StreamAdapter impleme // create the connection retrier connectionRetrier = new MqttConnectionRetrier(retrier); + + log.info("Starting MQTT Streamer " + cachedLogValues); + + // connect connectionRetrier.connect(); } @@ -286,6 +298,9 @@ public class MqttStreamer extends StreamAdapter impleme // ------------------------------- /** + * Implements the {@link MqttCallback#connectionLost(Throwable)} callback method for the MQTT client to inform the + * streamer that the connection has been lost. + * * {@inheritDoc} */ @Override public void connectionLost(Throwable throwable) { @@ -295,31 +310,41 @@ public class MqttStreamer extends StreamAdapter impleme if (stopped) return; - log.warning(String.format("MQTT Connection to server %s was lost.", brokerUrl), throwable); + log.warning(String.format("MQTT Connection to broker was lost [brokerUrl=%s, type=%s, err=%s]", brokerUrl, + throwable.getClass(), throwable.getMessage())); + connectionRetrier.connect(); } /** + * Implements the {@link MqttCallback#messageArrived(String, MqttMessage)} to receive an MQTT message. + * * {@inheritDoc} */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { if (getMultipleTupleExtractor() != null) { Map entries = getMultipleTupleExtractor().extract(message); - if (log.isTraceEnabled()) { + + if (log.isTraceEnabled()) log.trace("Adding cache entries: " + entries); - } + getStreamer().addData(entries); } else { Map.Entry entry = getSingleTupleExtractor().extract(message); - if (log.isTraceEnabled()) { + + if (log.isTraceEnabled()) log.trace("Adding cache entry: " + entry); - } + getStreamer().addData(entry); } } /** + * Empty implementation of {@link MqttCallback#deliveryComplete(IMqttDeliveryToken)}. + * + * Not required by the streamer as it doesn't produce messages. + * * {@inheritDoc} */ @Override public void deliveryComplete(IMqttDeliveryToken token) { @@ -331,13 +356,8 @@ public class MqttStreamer extends StreamAdapter impleme // ------------------------------- /** - * @return - */ - public String getBrokerUrl() { - return brokerUrl; - } - - /** + * Sets the broker URL (compulsory). + * * @param brokerUrl The Broker URL (compulsory). */ public void setBrokerUrl(String brokerUrl) { @@ -345,189 +365,262 @@ public class MqttStreamer extends StreamAdapter impleme } /** - * @return + * Gets the broker URL. + * + * @return The Broker URL. */ - public String getTopic() { - return topic; + public String getBrokerUrl() { + return brokerUrl; } /** - * @param topic The topic to subscribe to, if a single topic. + * Sets the topic to subscribe to, if a single topic. + * + * @param topic The topic to subscribe to. */ public void setTopic(String topic) { this.topic = topic; } /** - * @return + * Gets the subscribed topic. + * + * @return The subscribed topic. */ - public Integer getQualityOfService() { - return qualityOfService; + public String getTopic() { + return topic; } /** - * @param qualityOfService The quality of service to use for a single topic subscription (optional). + * Sets the quality of service to use for a single topic subscription (optional). + * + * @param qualityOfService The quality of service. */ public void setQualityOfService(Integer qualityOfService) { this.qualityOfService = qualityOfService; } /** - * @return + * Gets the quality of service set by the user for a single topic consumption. + * + * @return The quality of service. */ - public List getTopics() { - return topics; + public Integer getQualityOfService() { + return qualityOfService; } /** - * @param topics The topics to subscribe to, if many. + * Sets the topics to subscribe to, if many. + * + * @param topics The topics. */ public void setTopics(List topics) { this.topics = topics; } /** - * @return + * Gets the topics subscribed to. + * + * @return The topics subscribed to. */ - public List getQualitiesOfService() { - return qualitiesOfService; + public List getTopics() { + return topics; } /** - * @param qualitiesOfService The qualities of service to use for multiple topic subscriptions. - * If specified, the list must contain the same number of elements as {@link #topics}. + * Sets the qualities of service to use for multiple topic subscriptions. If specified, the list must contain the + * same number of elements as {@link #topics}. + * + * @param qualitiesOfService The qualities of service. */ public void setQualitiesOfService(List qualitiesOfService) { this.qualitiesOfService = qualitiesOfService; } /** - * @return + * Gets the qualities of service for multiple topics. + * + * @return The qualities of service. */ - public String getClientId() { - return clientId; + public List getQualitiesOfService() { + return qualitiesOfService; } /** - * @param clientId The MQTT client ID (optional). If one is not provided, we'll create one for you and maintain + * Sets the MQTT client ID (optional). If one is not provided, the streamer will generate one and will maintain * it througout any reconnection attempts. + * + * @param clientId The client ID. */ public void setClientId(String clientId) { this.clientId = clientId; } /** - * @return + * Gets the client ID, either the one set by the user or the automatically generated one. + * + * @return The client ID. + */ + public String getClientId() { + return clientId; + } + + /** + * Gets the currently set persistence mechanism. + * + * @return The persistence mechanism. */ public MqttClientPersistence getPersistence() { return persistence; } /** - * @param persistence A configurable persistence mechanism. If not set, Paho will use its default. + * Sets the persistence mechanism. If not set, Paho will use its default. + * + * @param persistence A configurable persistence mechanism. */ public void setPersistence(MqttClientPersistence persistence) { this.persistence = persistence; } /** - * @return + * Gets the currently used MQTT client connect options. + * + * @return The MQTT client connect options. */ public MqttConnectOptions getConnectOptions() { return connectOptions; } /** - * @param connectOptions The MQTT client connect options, where users can configured the last will and testament, durability, etc. + * Sets the MQTT client connect options, where users can configured the last will and testament, durability, etc. + * + * @param connectOptions The MQTT client connect options. */ public void setConnectOptions(MqttConnectOptions connectOptions) { this.connectOptions = connectOptions; } /** - * @return + * Sets whether to disconnect forcibly or not when shutting down. By default, it's false. + * + * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's false. + */ + public void setDisconnectForcibly(boolean disconnectForcibly) { + this.disconnectForcibly = disconnectForcibly; + } + + /** + * Gets whether this MQTT client will disconnect forcibly when shutting down. + * + * @return Whether to disconnect forcibly or not. */ public boolean isDisconnectForcibly() { return disconnectForcibly; } /** - * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's false. + * Sets the quiesce timeout on disconnection. If not provided, this streamer won't use any. + * + * @param disconnectQuiesceTimeout The disconnect quiesce timeout. */ - public void setDisconnectForcibly(boolean disconnectForcibly) { - this.disconnectForcibly = disconnectForcibly; + public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) { + this.disconnectQuiesceTimeout = disconnectQuiesceTimeout; } /** - * @return + * Gets the disconnect quiesce timeout. + * + * @return The disconnect quiesce timeout. */ public Integer getDisconnectQuiesceTimeout() { return disconnectQuiesceTimeout; } /** - * @param disconnectQuiesceTimeout Quiesce timeout on disconnection. If not provided, this streamer won't use any. + * Sets the timeout if disconnecting forcibly. Compulsory in that case. + * + * @param disconnectForciblyTimeout The disconnect forcibly timeout. */ - public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) { - this.disconnectQuiesceTimeout = disconnectQuiesceTimeout; + public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) { + this.disconnectForciblyTimeout = disconnectForciblyTimeout; } /** - * @return + * Gets the timeout if disconnecting forcibly. + * + * @return Timeout. */ public Integer getDisconnectForciblyTimeout() { return disconnectForciblyTimeout; } /** - * @param disconnectForciblyTimeout If disconnecting forcibly, the timeout. Compulsory in that case. + * Sets the strategy to determine how long to wait between retry attempts. By default, this streamer uses a + * Fibonacci-based strategy. + * + * @param retryWaitStrategy The retry wait strategy. */ - public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) { - this.disconnectForciblyTimeout = disconnectForciblyTimeout; + public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) { + this.retryWaitStrategy = retryWaitStrategy; } /** - * @return + * Gets the retry wait strategy. + * + * @return The retry wait strategy. */ public WaitStrategy getRetryWaitStrategy() { return retryWaitStrategy; } /** - * @param retryWaitStrategy The strategy to determine how long to wait between retry attempts. - * By default, this streamer uses a Fibonacci-based strategy. + * Sets the strategy to determine when to stop retrying to (re-)connect. By default, we never stop. + * + * @param retryStopStrategy The retry stop strategy. */ - public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) { - this.retryWaitStrategy = retryWaitStrategy; + public void setRetryStopStrategy(StopStrategy retryStopStrategy) { + this.retryStopStrategy = retryStopStrategy; } /** - * @return + * Gets the retry stop strategy. + * + * @return The retry stop strategy. */ public StopStrategy getRetryStopStrategy() { return retryStopStrategy; } /** - * @param retryStopStrategy The strategy to determine when to stop retrying to (re-)connect. By default, we never stop. + * Sets whether to block the start() method until connected for the first time. By default, it's false. + * + * @param blockUntilConnected Whether to block or not. */ - public void setRetryStopStrategy(StopStrategy retryStopStrategy) { - this.retryStopStrategy = retryStopStrategy; + public void setBlockUntilConnected(boolean blockUntilConnected) { + this.blockUntilConnected = blockUntilConnected; } - /** - * @return - */ public boolean isBlockUntilConnected() { return blockUntilConnected; } /** - * @param blockUntilConnected Whether to block the start() method until connected for the first time. By default, - * false. + * Returns whether this streamer is stopped. + * + * @return true if stopped; false if not. */ - public void setBlockUntilConnected(boolean blockUntilConnected) { - this.blockUntilConnected = blockUntilConnected; + public boolean isStopped() { + return stopped; + } + + /** + * Returns whether this streamer is connected. + * + * @return true if connected; false if not. + */ + public boolean isConnected() { + return connected; } /** @@ -551,7 +644,7 @@ public class MqttStreamer extends StreamAdapter impleme } /** - * Method that is called by the streamer to ask us to (re-)connect. + * Method called by the streamer to ask us to (re-)connect. */ public void connect() { Callable callable = retrier.wrap(new Callable() { @@ -576,12 +669,15 @@ public class MqttStreamer extends StreamAdapter impleme else { int[] qoses = new int[qualitiesOfService.size()]; + for (int i = 0; i < qualitiesOfService.size(); i++) qoses[i] = qualitiesOfService.get(i); client.subscribe(topics.toArray(new String[0]), qoses); } + log.info("MQTT Streamer (re-)connected and subscribed " + cachedLogValues); + connected = true; return connected; } @@ -608,4 +704,4 @@ public class MqttStreamer extends StreamAdapter impleme } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d432f/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java index 5ac7339..76404b8 100644 --- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java +++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java @@ -105,6 +105,10 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { super(true); } + /** + * + * @throws Exception + */ @Before @SuppressWarnings("unchecked") public void beforeTest() throws Exception { grid().getOrCreateCache(defaultCacheConfiguration()); @@ -123,7 +127,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { PolicyMap policyMap = new PolicyMap(); PolicyEntry policy = new PolicyEntry(); + policy.setQueuePrefetch(1); + broker.setDestinationPolicy(policyMap); broker.getDestinationPolicy().setDefaultEntry(policy); broker.setSchedulerSupport(false); @@ -138,13 +144,19 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { // create the client and connect client = new MqttClient(brokerUrl, UUID.randomUUID().toString(), new MemoryPersistence()); + client.connect(); // create mqtt streamer dataStreamer = grid().dataStreamer(null); + streamer = createMqttStreamer(dataStreamer); } + /** + * + * @throws Exception + */ @After public void afterTest() throws Exception { try { @@ -160,7 +172,6 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { broker.stop(); broker.deleteAllMessages(); - } /** @@ -327,7 +338,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { broker.stop(); broker.start(true); broker.waitUntilStarted(); + Thread.sleep(2000); + client.connect(); // let's ensure we have 2 connections: Ignite and our test @@ -370,14 +383,17 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { // now shutdown the broker, wait 2 seconds and start it again broker.stop(); broker.start(true); + broker.waitUntilStarted(); + client.connect(); // lets send messages and ensure they are not received, because our retrier desisted sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false); + Thread.sleep(3000); - assertNull(grid().cache(null).get(50)); + assertNull(grid().cache(null).get(50)); } /** @@ -422,8 +438,8 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { catch (Exception e) { return; } - fail("Expected an exception reporting invalid parameters"); + fail("Expected an exception reporting invalid parameters"); } /** @@ -449,28 +465,34 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { private void sendMessages(final List topics, int fromIdx, int count, boolean singleMessage) throws MqttException { if (singleMessage) { final List sbs = new ArrayList<>(topics.size()); + // initialize String Builders for each topic F.forEach(topics, new IgniteInClosure() { @Override public void apply(String s) { sbs.add(new StringBuilder()); } }); + // fill String Builders for each topic F.forEach(F.range(fromIdx, fromIdx + count), new IgniteInClosure() { @Override public void apply(Integer integer) { sbs.get(integer % topics.size()).append(integer.toString() + "," + TEST_DATA.get(integer) + "\n"); } }); + // send each buffer out for (int i = 0; i < topics.size(); i++) { MqttMessage msg = new MqttMessage(sbs.get(i).toString().getBytes()); + client.publish(topics.get(i % topics.size()), msg); } } else { for (int i = fromIdx; i < fromIdx + count; i++) { byte[] payload = (i + "," + TEST_DATA.get(i)).getBytes(); + MqttMessage msg = new MqttMessage(payload); + client.publish(topics.get(i % topics.size()), msg); } } @@ -487,6 +509,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { @SuppressWarnings("serial") IgniteBiPredicate callback = new IgniteBiPredicate() { @Override public boolean apply(UUID uuid, CacheEvent evt) { latch.countDown(); + return true; } }; @@ -522,6 +545,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { return new StreamSingleTupleExtractor() { @Override public Map.Entry extract(MqttMessage msg) { List s = Splitter.on(",").splitToList(new String(msg.getPayload())); + return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1)); } }; @@ -539,15 +563,18 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest { .omitEmptyStrings() .withKeyValueSeparator(",") .split(new String(msg.getPayload())); + final Map answer = new HashMap<>(); + F.forEach(map.keySet(), new IgniteInClosure() { @Override public void apply(String s) { answer.put(Integer.parseInt(s), map.get(s)); } }); + return answer; } }; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d432f/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java index ff25145..413eaab 100644 --- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java +++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java @@ -31,4 +31,4 @@ import org.junit.runners.Suite; }) public class IgniteMqttStreamerTestSuite { -} \ No newline at end of file +}