ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [34/34] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-3727-2
Date Tue, 14 Feb 2017 13:11:53 GMT
Merge remote-tracking branch 'remotes/origin/master' into ignite-3727-2

# Conflicts:
#	modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/68f2d38e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/68f2d38e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/68f2d38e

Branch: refs/heads/ignite-3727-2
Commit: 68f2d38e3fda3adedc522d363bb02c2100a9a084
Parents: 91e8340
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Feb 14 16:08:26 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Feb 14 16:08:26 2017 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteMessaging.java |  10 +-
 .../ignite/internal/IgniteMessagingImpl.java    |   2 +-
 .../managers/communication/GridIoManager.java   |   3 +-
 ...niteMessagingConfigVariationFullApiTest.java |  20 +-
 .../messaging/IgniteMessagingSendAsyncTest.java | 230 +++++++++++--------
 5 files changed, 164 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/68f2d38e/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
index b0cbe1d..d769eb2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
@@ -77,8 +77,9 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
 
     /**
      * Sends given message with specified topic to the nodes in the underlying cluster group.
-     * When you invoke method, all listeners who were registered on topic in the local node,
will executing in the same thread
-     * by default, or if you use {@link #withAsync()}, listeners will execute through thread
pool, and current thread will not be block.
+     * When you invoke method, all listeners who were registered on topic in the local node,
will executing in
+     * the same thread by default, or if you use {@link #withAsync()}, listeners will execute
+     * through thread pool, and current thread will not be block.
      *
      * @param topic Topic to send to, {@code null} for default topic.
      * @param msg Message to send.
@@ -89,8 +90,9 @@ public interface IgniteMessaging extends IgniteAsyncSupport {
 
     /**
      * Sends given messages with the specified topic to the nodes in the underlying cluster
group.
-     * When you invoke method, all listeners who were registered on topic in the local node,
will executing in the same thread
-     * by default, or if you use {@link #withAsync()}, listeners will execute through thread
pool, and current thread will not be block.
+     * When you invoke method, all listeners who were registered on topic in the local node,
will executing
+     * in the same thread by default, or if you use {@link #withAsync()}, listeners will
execute
+     * through thread pool, and current thread will not be block.
      *
      * @param topic Topic to send to, {@code null} for default topic.
      * @param msgs Messages to send. Order of the sending is undefined. If the method produces

http://git-wip-us.apache.org/repos/asf/ignite/blob/68f2d38e/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
index e586aa2..541fad4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
@@ -137,7 +137,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging>
             if (timeout == 0)
                 timeout = ctx.config().getNetworkTimeout();
 
-            ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout, isAsync());
+            ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout, false);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/68f2d38e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index cda1321..50a4efe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -685,7 +685,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 case UTILITY_CACHE_POOL:
                 case MARSH_CACHE_POOL:
                 case IDX_POOL:
-                case IGFS_POOL: {
+                case IGFS_POOL:
+                {
                     if (msg.isOrdered())
                         processOrderedMessage(nodeId, msg, plc, msgC);
                     else

http://git-wip-us.apache.org/repos/asf/ignite/blob/68f2d38e/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
index f66535a..c6b46d2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/messaging/IgniteMessagingConfigVariationFullApiTest.java
@@ -289,6 +289,8 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
     /**
      * Single server test.
+     *
+     * @param async Async message send flag.
      * @throws Exception If failed.
      */
     private void localServerInternal(boolean async) throws Exception {
@@ -345,6 +347,8 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
     /**
      * Server sends a message and client receives it.
+     *
+     * @param async Async message send flag.
      * @throws Exception If failed.
      */
     private void serverClientMessage(boolean async) throws Exception {
@@ -359,6 +363,8 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
     /**
      * Client sends a message and client receives it.
+     *
+     * @param async Async message send flag.
      * @throws Exception If failed.
      */
     private void clientClientMessage(boolean async) throws Exception {
@@ -373,7 +379,8 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
 
     /**
      * Client sends a message and client receives it.
-     * @param async flag async mode
+     *
+     * @param async Async message send flag.
      * @throws Exception If failed.
      */
     private void clientServerMessage(boolean async) throws Exception {
@@ -389,6 +396,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
     /**
      * @param ignite Ignite.
      * @param grp Cluster group.
+     * @param async Async message send flag.
      * @throws Exception If fail.
      */
     private void registerListenerAndSendMessages(Ignite ignite, ClusterGroup grp, boolean
async) throws Exception {
@@ -443,7 +451,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
     }
 
     /**
-     *
+     * @param async Async message send flag.
      * @throws Exception If fail.
      */
     private void orderedMessage(boolean async) throws Exception {
@@ -457,7 +465,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
     }
 
     /**
-     *
+     * @param async Async message send flag.
      * @throws Exception If fail.
      */
     private void clientServerOrderedMessage(boolean async) throws Exception {
@@ -471,7 +479,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
     }
 
     /**
-     *
+     * @param async Async message send flag.
      * @throws Exception If fail.
      */
     private void clientClientOrderedMessage(boolean async) throws Exception {
@@ -485,7 +493,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
     }
 
     /**
-     *
+     * @param async Async message send flag.
      * @throws Exception If fail.
      */
     private void serverClientOrderedMessage(boolean async) throws Exception {
@@ -501,6 +509,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
     /**
      * @param ignite Ignite.
      * @param grp Cluster group.
+     * @param async Async message send flag.
      * @throws Exception If fail.
      */
     private void registerListenerAndSendOrderedMessages(Ignite ignite, ClusterGroup grp,
boolean async) throws Exception {
@@ -531,6 +540,7 @@ public class IgniteMessagingConfigVariationFullApiTest extends IgniteConfigVaria
      * @param nodeSnd Sender Ignite node.
      * @param grp Cluster group.
      * @param msg Message.
+     * @param async Async message send flag.
      */
     private void sendMessage(Ignite nodeSnd, ClusterGroup grp, Object msg, boolean async)
{
         if (async)

http://git-wip-us.apache.org/repos/asf/ignite/blob/68f2d38e/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
index 73baeba..75e7d22 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
@@ -19,14 +19,6 @@ package org.apache.ignite.messaging;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteMessaging;
-import org.apache.ignite.lang.IgniteBiInClosure;
-import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jsr166.ThreadLocalRandom8;
-import org.junit.Assert;
-
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
@@ -34,36 +26,54 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteMessaging;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ThreadLocalRandom8;
+import org.junit.Assert;
 
 /**
  *
  */
 public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest implements Serializable
{
-    /**
-     * Topic name.
-     */
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Threads number for multi-thread tests. */
+    private static final int THREADS = 10;
+
+    /** */
     private final String TOPIC = "topic";
 
-    /**
-     * Message string.
-     */
+    /** */
     private final String msgStr = "message";
 
-    /** Count threads for multi-thread test */
-    private final int threads = 10;
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override protected void afterTest() throws Exception {
-        super.afterTest();
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        return cfg;
+    }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
         stopAllGrids();
+
+        super.afterTest();
     }
 
     /**
-     * Test for check, that if use default mode, local listeners execute
-     * in the same thread. 1 node in topology.
+     * Checks if use default mode, local listeners execute in the same thread, 1 node in
topology.
+     *
+     * @throws Exception If failed.
      */
     public void testSendDefaultMode() throws Exception {
         Ignite ignite1 = startGrid(1);
@@ -77,8 +87,9 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
-     * Test for check, that if use async mode, local listeners execute
-     * in another thread(through pool). 1 node in topology.
+     * Checks if use async mode, local listeners execute in another thread, 1 node in topology.
+     *
+     * @throws Exception If failed.
      */
     public void testSendAsyncMode() throws Exception {
         Ignite ignite1 = startGrid(1);
@@ -92,14 +103,15 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
-     * Test for check, that if use default mode, local listeners execute
-     * in the same thread. 2 node in topology.
+     * Checks if use default mode, local listeners execute in the same thread, 2 nodes in
topology.
+     *
+     * @throws Exception If failed.
      */
-    public void testSendDefaultMode2Node() throws Exception {
+    public void testSendDefaultMode2Nodes() throws Exception {
         Ignite ignite1 = startGrid(1);
         Ignite ignite2 = startGrid(2);
 
-        sendWith2Node(ignite2, ignite1.message(), msgStr, new IgniteBiInClosure<String,
Thread> () {
+        sendWith2Nodes(ignite2, ignite1.message(), msgStr, new IgniteBiInClosure<String,
Thread> () {
             @Override public  void apply(String msg, Thread thread) {
                 Assert.assertEquals(Thread.currentThread(), thread);
                 Assert.assertEquals(msgStr, msg);
@@ -108,14 +120,15 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
-     * Test for check, that if use async mode, local listeners execute
-     * in another thread(through pool). 2 node in topology.
+     * Checks if use async mode, local listeners execute in another thread, 2 nodes in topology.
+     *
+     * @throws Exception If failed.
      */
     public void testSendAsyncMode2Node() throws Exception {
         Ignite ignite1 = startGrid(1);
         Ignite ignite2 = startGrid(2);
 
-        sendWith2Node(ignite2, ignite1.message().withAsync(), msgStr,  new IgniteBiInClosure<String,
Thread> () {
+        sendWith2Nodes(ignite2, ignite1.message().withAsync(), msgStr,  new IgniteBiInClosure<String,
Thread> () {
             @Override public  void apply(String msg, Thread thread) {
                 Assert.assertTrue(!Thread.currentThread().equals(thread));
                 Assert.assertEquals(msgStr, msg);
@@ -124,12 +137,14 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
-     * Test for check, that SendOrdered work in our thread pool. 1 node in topology.
+     * Checks that sendOrdered works in thread pool, 1 node in topology.
+     *
+     * @throws Exception If failed.
      */
     public void testSendOrderedDefaultMode() throws Exception {
         Ignite ignite1 = startGrid(1);
 
-        final List<String> msgs = orderedMsg();
+        final List<String> msgs = orderedMessages();
 
         sendOrdered(ignite1.message(), msgs, new IgniteBiInClosure< List<String>,
 List<Thread>> () {
             @Override public void apply(List<String> received, List<Thread> threads)
{
@@ -140,12 +155,14 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
-     * Test for check, that SendOrdered work in our thread pool. 1 node in topology.
+     * Checks that sendOrdered work in thread pool, 1 node in topology.
+     *
+     * @throws Exception If failed.
      */
     public void testSendOrderedAsyncMode() throws Exception {
         Ignite ignite1 = startGrid(1);
 
-        final List<String> msgs = orderedMsg();
+        final List<String> msgs = orderedMessages();
 
         sendOrdered(ignite1.message().withAsync(), msgs, new IgniteBiInClosure< List<String>,
 List<Thread>> () {
             @Override public void apply(List<String> received, List<Thread> threads)
{
@@ -156,13 +173,15 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
-     * Test for check, that SendOrdered work in our thread pool. 2 node in topology.
+     * Checks that sendOrdered work in thread pool, 2 nodes in topology.
+     *
+     * @throws Exception If failed.
      */
     public void testSendOrderedDefaultMode2Node() throws Exception {
         Ignite ignite1 = startGrid(1);
         Ignite ignite2 = startGrid(2);
 
-        final List<String> msgs = orderedMsg();
+        final List<String> msgs = orderedMessages();
 
         sendOrderedWith2Node(ignite2, ignite1.message(), msgs, new IgniteBiInClosure<List<String>,
List<Thread>>() {
             @Override public void apply(List<String> received, List<Thread> threads)
{
@@ -173,13 +192,15 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
-     * Test for check, that SendOrdered work in our thread pool. 2 node in topology.
+     * Checks that sendOrdered work in thread pool, 2 nodes in topology.
+     *
+     * @throws Exception If failed.
      */
     public void testSendOrderedAsyncMode2Node() throws Exception {
         Ignite ignite1 = startGrid(1);
         Ignite ignite2 = startGrid(2);
 
-        final List<String> msgs = orderedMsg();
+        final List<String> msgs = orderedMessages();
 
         sendOrderedWith2Node(ignite2, ignite1.message().withAsync(), msgs, new IgniteBiInClosure<List<String>,
List<Thread>>() {
             @Override public void apply(List<String> received, List<Thread> threads)
{
@@ -190,6 +211,7 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
+     * @throws Exception If failed.
      */
     public void testSendOrderedDefaultModeMultiThreads() throws Exception {
         Ignite ignite = startGrid(1);
@@ -198,6 +220,7 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
+     * @throws Exception If failed.
      */
     public void testSendOrderedAsyncModeMultiThreads() throws Exception {
         Ignite ignite = startGrid(1);
@@ -206,6 +229,7 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
+     * @throws Exception If failed.
      */
     public void testSendOrderedDefaultModeMultiThreadsWith2Node() throws Exception {
         Ignite ignite1 = startGrid(1);
@@ -215,6 +239,7 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
+     * @throws Exception If failed.
      */
     public void testSendOrderedAsyncModeMultiThreadsWith2Node() throws Exception {
         Ignite ignite1 = startGrid(1);
@@ -224,44 +249,45 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
-     * @param ignite2 Ignite 2.
+     * @param ignite2 Second node.
      * @param ignMsg IgniteMessage.
+     * @throws Exception If failed.
      */
     private void sendOrderedMultiThreadsWith2Node(
             final Ignite ignite2,
             final IgniteMessaging ignMsg
-    ) throws InterruptedException {
+    ) throws Exception {
         final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap();
         final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap();
 
-        final List<String> msgs = orderedMsg();
+        final List<String> msgs = orderedMessages();
 
         sendOrderedMultiThreadsWith2Node(ignite2, ignMsg, expMsg, actlMsg, msgs);
 
     }
 
-
     /**
-     * @param ignMsg IgniteMessage.
+     * @param ignMsg IgniteMessaging.
+     * @throws Exception If failed.
      */
     private void sendOrderedMultiThreads(
             final IgniteMessaging ignMsg
-    ) throws InterruptedException {
+    ) throws Exception {
         final ConcurrentMap<String, List<String>> expMsg = Maps.newConcurrentMap();
         final ConcurrentMap<String, List<String>> actlMsg = Maps.newConcurrentMap();
 
-        final List<String> msgs = orderedMsg();
+        final List<String> msgs = orderedMessages();
 
         sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs);
-
     }
 
     /**
-     * @param ignite2 Ignite 2.
+     * @param ignite2 Second node.
      * @param ignMsg Ignite for send message.
      * @param expMsg Expected messages map.
      * @param actlMsg Actual message map.
-     * @param msgs List msgs.
+     * @param msgs List of messages.
+     * @throws Exception If failed.
      */
     private void sendOrderedMultiThreadsWith2Node(
             final Ignite ignite2,
@@ -269,16 +295,19 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
             final ConcurrentMap<String, List<String>> expMsg,
             final ConcurrentMap<String, List<String>> actlMsg,
             final List<String> msgs
-    ) throws InterruptedException {
-        final CountDownLatch latch = new CountDownLatch(threads * msgs.size());
+    ) throws Exception {
+        final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size());
 
         final ConcurrentMap<String, List<String>> actlMsgNode2 = Maps.newConcurrentMap();
 
         ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, Message>()
{
             @Override public boolean apply(UUID uuid, Message msg) {
                 actlMsgNode2.putIfAbsent(msg.threadName, Lists.<String>newArrayList());
-                actlMsgNode2.get(msg.threadName).add(msg.message);
+
+                actlMsgNode2.get(msg.threadName).add(msg.msg);
+
                 latch.countDown();
+
                 return true;
             }
         });
@@ -297,34 +326,41 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
      * @param ignMsg Ignite for send message.
      * @param expMsg Expected messages map.
      * @param actlMsg Actual message map.
-     * @param msgs List msgs.
+     * @param msgs List of messages.
+     * @throws Exception If failed.
      */
     private void sendOrderedMultiThreads(
             final IgniteMessaging ignMsg,
             final ConcurrentMap<String, List<String>> expMsg,
             final ConcurrentMap<String, List<String>> actlMsg,
             final List<String> msgs
-    ) throws InterruptedException {
-        final CountDownLatch latch = new CountDownLatch(threads * msgs.size());
+    ) throws Exception {
+        final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size());
 
         ignMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, Message>() {
             @Override public boolean apply(UUID uuid, Message msg) {
                 actlMsg.putIfAbsent(msg.threadName, Lists.<String>newArrayList());
-                actlMsg.get(msg.threadName).add(msg.message);
+
+                actlMsg.get(msg.threadName).add(msg.msg);
+
                 latch.countDown();
+
                 return true;
             }
         });
 
-        for (int i = 0; i < threads; i++)
+        for (int i = 0; i < THREADS; i++)
             new Thread(new Runnable() {
                 @Override public void run() {
                     String thdName = Thread.currentThread().getName();
+
                     List<String> exp = Lists.newArrayList();
+
                     expMsg.put(thdName, exp);
 
                     for (String msg : msgs) {
                         exp.add(msg);
+
                         ignMsg.sendOrdered(TOPIC, new Message(thdName, msg), 1000);
                     }
 
@@ -340,40 +376,26 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
-     */
-    private class Message implements Serializable{
-        /** Thread name. */
-        private String threadName;
-        /** Message. */
-        private String message;
-
-        /**
-         * @param threadName Thread name.
-         * @param msg Message.
-         */
-        private Message(String threadName, String msg) {
-            this.threadName = threadName;
-            this.message = msg;
-        }
-    }
-
-    /**
+     * @param ignite2 Second node.
      * @param igniteMsg Ignite message.
      * @param msgStr    Message string.
      * @param cls       Callback for compare result.
+     * @throws Exception If failed.
      */
-    private void sendWith2Node(
+    private void sendWith2Nodes(
             final Ignite ignite2,
             final IgniteMessaging igniteMsg,
             final String msgStr,
-            final IgniteBiInClosure<String,Thread>  cls
+            final IgniteBiInClosure<String, Thread>  cls
     ) throws Exception {
         final CountDownLatch latch = new CountDownLatch(1);
 
         ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>()
{
             @Override public boolean apply(UUID uuid, String msg) {
                 Assert.assertEquals(msgStr, msg);
+
                 latch.countDown();
+
                 return true;
             }
         });
@@ -384,15 +406,16 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
-     * @param igniteMsg Ignite message.
+     * @param igniteMsg Ignite messaging.
      * @param msgStr    Message string.
      * @param cls       Callback for compare result.
+     * @throws Exception If failed.
      */
     private void send(
            final IgniteMessaging igniteMsg,
            final String msgStr,
-           final IgniteBiInClosure<String,Thread> cls
-    ) throws InterruptedException {
+           final IgniteBiInClosure<String, Thread> cls
+    ) throws Exception {
         final CountDownLatch latch = new CountDownLatch(1);
 
         final AtomicReference<Thread> thread = new AtomicReference<>();
@@ -401,8 +424,11 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
         igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
             @Override public boolean apply(UUID uuid, String msgStr) {
                 thread.set(Thread.currentThread());
+
                 val.set(msgStr);
+
                 latch.countDown();
+
                 return true;
             }
         });
@@ -415,18 +441,18 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
-     * @param ignite2 Ignite 2.
+     * @param ignite2 Second node.
      * @param igniteMsg Ignite message.
      * @param msgs messages for send.
      * @param cls  Callback for compare result.
+     * @throws Exception If failed.
      */
     private void sendOrderedWith2Node(
             final Ignite ignite2,
             final IgniteMessaging igniteMsg,
             final List<String> msgs,
-            final IgniteBiInClosure<List<String>,List<Thread>> cls
+            final IgniteBiInClosure<List<String>, List<Thread>> cls
     ) throws Exception {
-
         final CountDownLatch latch = new CountDownLatch(msgs.size());
 
         final List<String> received = Lists.newArrayList();
@@ -434,7 +460,9 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
         ignite2.message().localListen(TOPIC, new IgniteBiPredicate<UUID, String>()
{
             @Override public boolean apply(UUID uuid, String msg) {
                 received.add(msg);
+
                 latch.countDown();
+
                 return true;
             }
         });
@@ -444,20 +472,19 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
         latch.await();
 
         assertTrue(msgs.equals(received));
-
     }
 
     /**
      * @param igniteMsg Ignite message.
      * @param msgs  messages for send.
      * @param cls Callback for compare result.
+     * @throws Exception If failed.
      */
     private<T> void sendOrdered(
             final IgniteMessaging igniteMsg,
             final List<T> msgs,
             final IgniteBiInClosure<List<T>,List<Thread>> cls
-    ) throws InterruptedException {
-
+    ) throws Exception {
         final CountDownLatch latch = new CountDownLatch(msgs.size());
 
         final List<T> received = Lists.newArrayList();
@@ -469,8 +496,11 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
         igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, T>() {
             @Override public boolean apply(UUID uuid, T s) {
                 received.add(s);
+
                 threads.add(Thread.currentThread());
+
                 latch.countDown();
+
                 return true;
             }
         });
@@ -481,14 +511,34 @@ public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest
impleme
     }
 
     /**
-     * @return List ordered messages
+     * @return List of ordered messages
      */
-    private List<String> orderedMsg() {
+    private List<String> orderedMessages() {
         final List<String> msgs = Lists.newArrayList();
 
         for (int i = 0; i < 1000; i++)
-            msgs.add("" + ThreadLocalRandom8.current().nextInt());
+            msgs.add(String.valueOf(ThreadLocalRandom8.current().nextInt()));
 
         return msgs;
     }
+
+    /**
+     *
+     */
+    private static class Message implements Serializable{
+        /** Thread name. */
+        private final String threadName;
+
+        /** Message. */
+        private final String msg;
+
+        /**
+         * @param threadName Thread name.
+         * @param msg Message.
+         */
+        private Message(String threadName, String msg) {
+            this.threadName = threadName;
+            this.msg = msg;
+        }
+    }
 }


Mime
View raw message