ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ra...@apache.org
Subject [1/6] ignite git commit: IGNITE-1747 Fix code style and Javadoc issues in MQTT streamer.
Date Tue, 27 Oct 2015 16:06:20 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 3ec52f36b -> db8886079


IGNITE-1747 Fix code style and Javadoc issues in MQTT streamer.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb0d432f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb0d432f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb0d432f

Branch: refs/heads/master
Commit: cb0d432f358615b12a65eb62f4b03b70680b3575
Parents: 43b40f4
Author: Raul Kripalani <raulk@apache.org>
Authored: Tue Oct 20 18:36:52 2015 +0100
Committer: Raul Kripalani <raulk@apache.org>
Committed: Tue Oct 20 18:37:20 2015 +0100

----------------------------------------------------------------------
 .../apache/ignite/stream/mqtt/MqttStreamer.java | 252 +++++++++++++------
 .../stream/mqtt/IgniteMqttStreamerTest.java     |  35 ++-
 .../mqtt/IgniteMqttStreamerTestSuite.java       |   2 +-
 3 files changed, 206 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d432f/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
index f18ae42..39d8d6e 100644
--- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.stream.mqtt;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -70,10 +71,10 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
  * </ul>
  *
  * Note: features like durable subscriptions, last will testament, etc. can be configured
via the
- * {@link #connectOptions} property.
+ * {@link #setConnectOptions(MqttConnectOptions)} setter.
  *
  * @see <a href="https://github.com/rholder/guava-retrying">guava-retrying library</a>
- * @author Raul Kripalani
+ *
  */
 public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements
MqttCallback {
 
@@ -137,7 +138,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage,
K, V> impleme
     private volatile boolean connected;
 
     /** Cached log prefix for cache messages. */
-    private String cachedLogPrefix;
+    private String cachedLogValues;
 
     /**
      * Starts streamer.
@@ -150,9 +151,12 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage,
K, V> impleme
 
         // for simplicity, if these are null initialize to empty lists
         topics = topics == null ? new ArrayList<String>() : topics;
+
         qualitiesOfService = qualitiesOfService == null ? new ArrayList<Integer>()
: qualitiesOfService;
 
         try {
+            Map<String, Object> logValues = new HashMap<>();
+
             // parameter validations
             A.notNull(getStreamer(), "streamer");
             A.notNull(getIgnite(), "ignite");
@@ -162,9 +166,8 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage,
K, V> impleme
             A.notNullOrEmpty(brokerUrl, "broker URL");
 
             // if the client ID is empty, generate one
-            if (clientId == null || clientId.length() == 0) {
+            if (clientId == null || clientId.length() == 0)
                 clientId = MqttClient.generateClientId();
-            }
 
             // if we have both a single topic and a list of topics (but the list of topic
is not of
             // size 1 and == topic, as this would be a case of re-initialization), fail
@@ -174,15 +177,13 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage,
K, V> impleme
 
             // same as above but for QoS
             if (qualityOfService != null && !qualitiesOfService.isEmpty() &&
qualitiesOfService.size() != 1 &&
-                !qualitiesOfService.get(0).equals(qualityOfService)) {
+                !qualitiesOfService.get(0).equals(qualityOfService))
                 throw new IllegalArgumentException("Cannot specify both a single QoS and
a list at the same time");
-            }
 
             // Paho API requires disconnect timeout if providing a quiesce timeout and disconnecting
forcibly
-            if (disconnectForcibly && disconnectQuiesceTimeout != null) {
+            if (disconnectForcibly && disconnectQuiesceTimeout != null)
                 A.notNull(disconnectForciblyTimeout, "disconnect timeout cannot be null when
disconnecting forcibly " +
                     "with quiesce");
-            }
 
             // if we have multiple topics
             if (!topics.isEmpty()) {
@@ -192,7 +193,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage,
K, V> impleme
                 A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(),
"qualities of " +
                     "service must be either empty or have the same size as topics list");
 
-                cachedLogPrefix = "[" + Joiner.on(",").join(topics) + "]";
+                logValues.put("topics", topics);
             }
             else {  // just the single topic
                 topics.add(topic);
@@ -200,9 +201,16 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage,
K, V> impleme
                 if (qualityOfService != null)
                     qualitiesOfService.add(qualityOfService);
 
-                cachedLogPrefix = "[" + topic + "]";
+                logValues.put("topic", topic);
             }
 
+            // finish building log values
+            logValues.put("brokerUrl", brokerUrl);
+            logValues.put("clientId", clientId);
+
+            // cache log values
+            cachedLogValues = "[" + Joiner.on(", ").withKeyValueSeparator("=").join(logValues)
+ "]";
+
             // create logger
             log = getIgnite().log();
 
@@ -232,6 +240,10 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage,
K, V> impleme
 
             // create the connection retrier
             connectionRetrier = new MqttConnectionRetrier(retrier);
+
+            log.info("Starting MQTT Streamer " + cachedLogValues);
+
+            // connect
             connectionRetrier.connect();
 
         }
@@ -286,6 +298,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage,
K, V> impleme
     // -------------------------------
 
     /**
+     * Implements the {@link MqttCallback#connectionLost(Throwable)} callback method for
the MQTT client to inform the
+     * streamer that the connection has been lost.
+     *
      * {@inheritDoc}
      */
     @Override public void connectionLost(Throwable throwable) {
@@ -295,31 +310,41 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage,
K, V> impleme
         if (stopped)
             return;
 
-        log.warning(String.format("MQTT Connection to server %s was lost.", brokerUrl), throwable);
+        log.warning(String.format("MQTT Connection to broker was lost [brokerUrl=%s, type=%s,
err=%s]", brokerUrl,
+            throwable.getClass(), throwable.getMessage()));
+
         connectionRetrier.connect();
     }
 
     /**
+     * Implements the {@link MqttCallback#messageArrived(String, MqttMessage)} to receive
an MQTT message.
+     *
      * {@inheritDoc}
      */
     @Override public void messageArrived(String topic, MqttMessage message) throws Exception
{
         if (getMultipleTupleExtractor() != null) {
             Map<K, V> entries = getMultipleTupleExtractor().extract(message);
-            if (log.isTraceEnabled()) {
+
+            if (log.isTraceEnabled())
                 log.trace("Adding cache entries: " + entries);
-            }
+
             getStreamer().addData(entries);
         }
         else {
             Map.Entry<K, V> entry = getSingleTupleExtractor().extract(message);
-            if (log.isTraceEnabled()) {
+
+            if (log.isTraceEnabled())
                 log.trace("Adding cache entry: " + entry);
-            }
+
             getStreamer().addData(entry);
         }
     }
 
     /**
+     * Empty implementation of {@link MqttCallback#deliveryComplete(IMqttDeliveryToken)}.
+     *
+     * Not required by the streamer as it doesn't produce messages.
+     *
      * {@inheritDoc}
      */
     @Override public void deliveryComplete(IMqttDeliveryToken token) {
@@ -331,13 +356,8 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage,
K, V> impleme
     // -------------------------------
 
     /**
-     * @return
-     */
-    public String getBrokerUrl() {
-        return brokerUrl;
-    }
-
-    /**
+     * Sets the broker URL (compulsory).
+     *
      * @param brokerUrl The Broker URL (compulsory).
      */
     public void setBrokerUrl(String brokerUrl) {
@@ -345,189 +365,262 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage,
K, V> impleme
     }
 
     /**
-     * @return
+     * Gets the broker URL.
+     *
+     * @return The Broker URL.
      */
-    public String getTopic() {
-        return topic;
+    public String getBrokerUrl() {
+        return brokerUrl;
     }
 
     /**
-     * @param topic The topic to subscribe to, if a single topic.
+     * Sets the topic to subscribe to, if a single topic.
+     *
+     * @param topic The topic to subscribe to.
      */
     public void setTopic(String topic) {
         this.topic = topic;
     }
 
     /**
-     * @return
+     * Gets the subscribed topic.
+     *
+     * @return The subscribed topic.
      */
-    public Integer getQualityOfService() {
-        return qualityOfService;
+    public String getTopic() {
+        return topic;
     }
 
     /**
-     * @param qualityOfService The quality of service to use for a single topic subscription
(optional).
+     * Sets the quality of service to use for a single topic subscription (optional).
+     *
+     * @param qualityOfService The quality of service.
      */
     public void setQualityOfService(Integer qualityOfService) {
         this.qualityOfService = qualityOfService;
     }
 
     /**
-     * @return
+     * Gets the quality of service set by the user for a single topic consumption.
+     *
+     * @return The quality of service.
      */
-    public List<String> getTopics() {
-        return topics;
+    public Integer getQualityOfService() {
+        return qualityOfService;
     }
 
     /**
-     * @param topics The topics to subscribe to, if many.
+     * Sets the topics to subscribe to, if many.
+     *
+     * @param topics The topics.
      */
     public void setTopics(List<String> topics) {
         this.topics = topics;
     }
 
     /**
-     * @return
+     * Gets the topics subscribed to.
+     *
+     * @return The topics subscribed to.
      */
-    public List<Integer> getQualitiesOfService() {
-        return qualitiesOfService;
+    public List<String> getTopics() {
+        return topics;
     }
 
     /**
-     * @param qualitiesOfService The qualities of service to use for multiple topic subscriptions.
-     * If specified, the list must contain the same number of elements as {@link #topics}.
+     * Sets the qualities of service to use for multiple topic subscriptions. If specified,
the list must contain the
+     * same number of elements as {@link #topics}.
+     *
+     * @param qualitiesOfService The qualities of service.
      */
     public void setQualitiesOfService(List<Integer> qualitiesOfService) {
         this.qualitiesOfService = qualitiesOfService;
     }
 
     /**
-     * @return
+     * Gets the qualities of service for multiple topics.
+     *
+     * @return The qualities of service.
      */
-    public String getClientId() {
-        return clientId;
+    public List<Integer> getQualitiesOfService() {
+        return qualitiesOfService;
     }
 
     /**
-     * @param clientId The MQTT client ID (optional). If one is not provided, we'll create
one for you and maintain
+     * Sets the MQTT client ID (optional). If one is not provided, the streamer will generate
one and will maintain
      * it througout any reconnection attempts.
+     *
+     * @param clientId The client ID.
      */
     public void setClientId(String clientId) {
         this.clientId = clientId;
     }
 
     /**
-     * @return
+     * Gets the client ID, either the one set by the user or the automatically generated
one.
+     *
+     * @return The client ID.
+     */
+    public String getClientId() {
+        return clientId;
+    }
+
+    /**
+     * Gets the currently set persistence mechanism.
+     *
+     * @return The persistence mechanism.
      */
     public MqttClientPersistence getPersistence() {
         return persistence;
     }
 
     /**
-     * @param persistence A configurable persistence mechanism. If not set, Paho will use
its default.
+     * Sets the persistence mechanism. If not set, Paho will use its default.
+     *
+     * @param persistence A configurable persistence mechanism.
      */
     public void setPersistence(MqttClientPersistence persistence) {
         this.persistence = persistence;
     }
 
     /**
-     * @return
+     * Gets the currently used MQTT client connect options.
+     *
+     * @return The MQTT client connect options.
      */
     public MqttConnectOptions getConnectOptions() {
         return connectOptions;
     }
 
     /**
-     * @param connectOptions The MQTT client connect options, where users can configured
the last will and testament, durability, etc.
+     * Sets the MQTT client connect options, where users can configured the last will and
testament, durability, etc.
+     *
+     * @param connectOptions The MQTT client connect options.
      */
     public void setConnectOptions(MqttConnectOptions connectOptions) {
         this.connectOptions = connectOptions;
     }
 
     /**
-     * @return
+     * Sets whether to disconnect forcibly or not when shutting down. By default, it's <tt>false</tt>.
+     *
+     * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's
<tt>false</tt>.
+     */
+    public void setDisconnectForcibly(boolean disconnectForcibly) {
+        this.disconnectForcibly = disconnectForcibly;
+    }
+
+    /**
+     * Gets whether this MQTT client will disconnect forcibly when shutting down.
+     *
+     * @return Whether to disconnect forcibly or not.
      */
     public boolean isDisconnectForcibly() {
         return disconnectForcibly;
     }
 
     /**
-     * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's
false.
+     * Sets the quiesce timeout on disconnection. If not provided, this streamer won't use
any.
+     *
+     * @param disconnectQuiesceTimeout The disconnect quiesce timeout.
      */
-    public void setDisconnectForcibly(boolean disconnectForcibly) {
-        this.disconnectForcibly = disconnectForcibly;
+    public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) {
+        this.disconnectQuiesceTimeout = disconnectQuiesceTimeout;
     }
 
     /**
-     * @return
+     * Gets the disconnect quiesce timeout.
+     *
+     * @return The disconnect quiesce timeout.
      */
     public Integer getDisconnectQuiesceTimeout() {
         return disconnectQuiesceTimeout;
     }
 
     /**
-     * @param disconnectQuiesceTimeout Quiesce timeout on disconnection. If not provided,
this streamer won't use any.
+     * Sets the timeout if disconnecting forcibly. Compulsory in that case.
+     *
+     * @param disconnectForciblyTimeout The disconnect forcibly timeout.
      */
-    public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) {
-        this.disconnectQuiesceTimeout = disconnectQuiesceTimeout;
+    public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) {
+        this.disconnectForciblyTimeout = disconnectForciblyTimeout;
     }
 
     /**
-     * @return
+     * Gets the timeout if disconnecting forcibly.
+     *
+     * @return Timeout.
      */
     public Integer getDisconnectForciblyTimeout() {
         return disconnectForciblyTimeout;
     }
 
     /**
-     * @param disconnectForciblyTimeout If disconnecting forcibly, the timeout. Compulsory
in that case.
+     * Sets the strategy to determine how long to wait between retry attempts. By default,
this streamer uses a
+     * Fibonacci-based strategy.
+     *
+     * @param retryWaitStrategy The retry wait strategy.
      */
-    public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) {
-        this.disconnectForciblyTimeout = disconnectForciblyTimeout;
+    public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) {
+        this.retryWaitStrategy = retryWaitStrategy;
     }
 
     /**
-     * @return
+     * Gets the retry wait strategy.
+     *
+     * @return The retry wait strategy.
      */
     public WaitStrategy getRetryWaitStrategy() {
         return retryWaitStrategy;
     }
 
     /**
-     * @param retryWaitStrategy The strategy to determine how long to wait between retry
attempts.
-     * By default, this streamer uses a Fibonacci-based strategy.
+     * Sets the strategy to determine when to stop retrying to (re-)connect. By default,
we never stop.
+     *
+     * @param retryStopStrategy The retry stop strategy.
      */
-    public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) {
-        this.retryWaitStrategy = retryWaitStrategy;
+    public void setRetryStopStrategy(StopStrategy retryStopStrategy) {
+        this.retryStopStrategy = retryStopStrategy;
     }
 
     /**
-     * @return
+     * Gets the retry stop strategy.
+     *
+     * @return The retry stop strategy.
      */
     public StopStrategy getRetryStopStrategy() {
         return retryStopStrategy;
     }
 
     /**
-     * @param retryStopStrategy The strategy to determine when to stop retrying to (re-)connect.
By default, we never stop.
+     * Sets whether to block the start() method until connected for the first time. By default,
it's <tt>false</tt>.
+     *
+     * @param blockUntilConnected Whether to block or not.
      */
-    public void setRetryStopStrategy(StopStrategy retryStopStrategy) {
-        this.retryStopStrategy = retryStopStrategy;
+    public void setBlockUntilConnected(boolean blockUntilConnected) {
+        this.blockUntilConnected = blockUntilConnected;
     }
 
-    /**
-     * @return
-     */
     public boolean isBlockUntilConnected() {
         return blockUntilConnected;
     }
 
     /**
-     * @param blockUntilConnected Whether to block the start() method until connected for
the first time. By default,
-     * false.
+     * Returns whether this streamer is stopped.
+     *
+     * @return <tt>true</tt> if stopped; <tt>false</tt> if not.
      */
-    public void setBlockUntilConnected(boolean blockUntilConnected) {
-        this.blockUntilConnected = blockUntilConnected;
+    public boolean isStopped() {
+        return stopped;
+    }
+
+    /**
+     * Returns whether this streamer is connected.
+     *
+     * @return <tt>true</tt> if connected; <tt>false</tt> if not.
+     */
+    public boolean isConnected() {
+        return connected;
     }
 
     /**
@@ -551,7 +644,7 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage,
K, V> impleme
         }
 
         /**
-         * Method that is called by the streamer to ask us to (re-)connect.
+         * Method called by the streamer to ask us to (re-)connect.
          */
         public void connect() {
             Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>()
{
@@ -576,12 +669,15 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage,
K, V> impleme
 
                     else {
                         int[] qoses = new int[qualitiesOfService.size()];
+
                         for (int i = 0; i < qualitiesOfService.size(); i++)
                             qoses[i] = qualitiesOfService.get(i);
 
                         client.subscribe(topics.toArray(new String[0]), qoses);
                     }
 
+                    log.info("MQTT Streamer (re-)connected and subscribed " + cachedLogValues);
+
                     connected = true;
                     return connected;
                 }
@@ -608,4 +704,4 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage,
K, V> impleme
 
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d432f/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 5ac7339..76404b8 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -105,6 +105,10 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         super(true);
     }
 
+    /**
+     *
+     * @throws Exception
+     */
     @Before @SuppressWarnings("unchecked")
     public void beforeTest() throws Exception {
         grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
@@ -123,7 +127,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry policy = new PolicyEntry();
+
         policy.setQueuePrefetch(1);
+
         broker.setDestinationPolicy(policyMap);
         broker.getDestinationPolicy().setDefaultEntry(policy);
         broker.setSchedulerSupport(false);
@@ -138,13 +144,19 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
         // create the client and connect
         client = new MqttClient(brokerUrl, UUID.randomUUID().toString(), new MemoryPersistence());
+
         client.connect();
 
         // create mqtt streamer
         dataStreamer = grid().dataStreamer(null);
+
         streamer = createMqttStreamer(dataStreamer);
     }
 
+    /**
+     *
+     * @throws Exception
+     */
     @After
     public void afterTest() throws Exception {
         try {
@@ -160,7 +172,6 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
         broker.stop();
         broker.deleteAllMessages();
-
     }
 
     /**
@@ -327,7 +338,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         broker.stop();
         broker.start(true);
         broker.waitUntilStarted();
+
         Thread.sleep(2000);
+
         client.connect();
 
         // let's ensure we have 2 connections: Ignite and our test
@@ -370,14 +383,17 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         // now shutdown the broker, wait 2 seconds and start it again
         broker.stop();
         broker.start(true);
+
         broker.waitUntilStarted();
+
         client.connect();
 
         // lets send messages and ensure they are not received, because our retrier desisted
         sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false);
+
         Thread.sleep(3000);
-        assertNull(grid().cache(null).get(50));
 
+        assertNull(grid().cache(null).get(50));
     }
 
     /**
@@ -422,8 +438,8 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         catch (Exception e) {
             return;
         }
-        fail("Expected an exception reporting invalid parameters");
 
+        fail("Expected an exception reporting invalid parameters");
     }
 
     /**
@@ -449,28 +465,34 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     private void sendMessages(final List<String> topics, int fromIdx, int count, boolean
singleMessage) throws MqttException {
         if (singleMessage) {
             final List<StringBuilder> sbs = new ArrayList<>(topics.size());
+
             // initialize String Builders for each topic
             F.forEach(topics, new IgniteInClosure<String>() {
                 @Override public void apply(String s) {
                     sbs.add(new StringBuilder());
                 }
             });
+
             // fill String Builders for each topic
             F.forEach(F.range(fromIdx, fromIdx + count), new IgniteInClosure<Integer>()
{
                 @Override public void apply(Integer integer) {
                     sbs.get(integer % topics.size()).append(integer.toString() + "," + TEST_DATA.get(integer)
+ "\n");
                 }
             });
+
             // send each buffer out
             for (int i = 0; i < topics.size(); i++) {
                 MqttMessage msg = new MqttMessage(sbs.get(i).toString().getBytes());
+
                 client.publish(topics.get(i % topics.size()), msg);
             }
         }
         else {
             for (int i = fromIdx; i < fromIdx + count; i++) {
                 byte[] payload = (i + "," + TEST_DATA.get(i)).getBytes();
+
                 MqttMessage msg = new MqttMessage(payload);
+
                 client.publish(topics.get(i % topics.size()), msg);
             }
         }
@@ -487,6 +509,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback =
new IgniteBiPredicate<UUID, CacheEvent>() {
             @Override public boolean apply(UUID uuid, CacheEvent evt) {
                 latch.countDown();
+
                 return true;
             }
         };
@@ -522,6 +545,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         return new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
             @Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
                 List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload()));
+
                 return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
             }
         };
@@ -539,15 +563,18 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
                     .omitEmptyStrings()
                     .withKeyValueSeparator(",")
                     .split(new String(msg.getPayload()));
+
                 final Map<Integer, String> answer = new HashMap<>();
+
                 F.forEach(map.keySet(), new IgniteInClosure<String>() {
                     @Override public void apply(String s) {
                         answer.put(Integer.parseInt(s), map.get(s));
                     }
                 });
+
                 return answer;
             }
         };
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb0d432f/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
index ff25145..413eaab 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
@@ -31,4 +31,4 @@ import org.junit.runners.Suite;
 })
 public class IgniteMqttStreamerTestSuite {
 
-}
\ No newline at end of file
+}


Mime
View raw message