Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CF3DF200C1C for ; Wed, 15 Feb 2017 11:52:30 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CDB6F160B5E; Wed, 15 Feb 2017 10:52:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DFE33160B46 for ; Wed, 15 Feb 2017 11:52:28 +0100 (CET) Received: (qmail 36902 invoked by uid 500); 15 Feb 2017 10:52:28 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 36892 invoked by uid 99); 15 Feb 2017 10:52:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Feb 2017 10:52:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E60E4DFC31; Wed, 15 Feb 2017 10:52:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: <09a378fa649d42ce878ff07306c4588b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-3727 Support local listeners async execution for IgniteMessage.send Date: Wed, 15 Feb 2017 10:52:27 +0000 (UTC) archived-at: Wed, 15 Feb 2017 10:52:31 -0000 Repository: ignite Updated Branches: refs/heads/master 2cfd55dcf -> e8f8e0acf ignite-3727 Support local listeners async execution for IgniteMessage.send Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e8f8e0ac Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e8f8e0ac Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e8f8e0ac Branch: refs/heads/master Commit: e8f8e0acf254133dd978d72adb73c5362752f706 Parents: 2cfd55d Author: Dmitriy Govorukhin Authored: Wed Feb 15 13:51:33 2017 +0300 Committer: sboikov Committed: Wed Feb 15 13:52:22 2017 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteMessaging.java | 11 +- .../ignite/internal/IgniteMessagingImpl.java | 6 +- .../internal/managers/GridManagerAdapter.java | 2 +- .../managers/communication/GridIoManager.java | 55 +- .../communication/GridIoManagerSelfTest.java | 6 +- ...niteMessagingConfigVariationFullApiTest.java | 195 +++++-- .../ignite/messaging/GridMessagingSelfTest.java | 114 +++- .../messaging/IgniteMessagingSendAsyncTest.java | 544 +++++++++++++++++++ .../ignite/testsuites/IgniteBasicTestSuite.java | 2 + .../hadoop/shuffle/HadoopShuffle.java | 2 +- 10 files changed, 865 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java index ab554af..e64ded5 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java @@ -77,6 +77,10 @@ public interface IgniteMessaging extends IgniteAsyncSupport { /** * Sends given message with specified topic to the nodes in the underlying cluster group. + *

+ * By default all local listeners will be executed in the calling thread, or if you use + * {@link #withAsync()}, listeners will execute in public thread pool (in this case it is user's + * responsibility to implement back-pressure and limit number of concurrently executed async messages). * * @param topic Topic to send to, {@code null} for default topic. * @param msg Message to send. @@ -87,6 +91,10 @@ public interface IgniteMessaging extends IgniteAsyncSupport { /** * Sends given messages with the specified topic to the nodes in the underlying cluster group. + *

+ * By default all local listeners will be executed in the calling thread, or if you use + * {@link #withAsync()}, listeners will execute in public thread pool (in this case it is user's + * responsibility to implement back-pressure and limit number of concurrently executed async messages). * * @param topic Topic to send to, {@code null} for default topic. * @param msgs Messages to send. Order of the sending is undefined. If the method produces @@ -99,7 +107,8 @@ public interface IgniteMessaging extends IgniteAsyncSupport { /** * Sends given message with specified topic to the nodes in the underlying cluster group. Messages sent with * this method will arrive in the same order they were sent. Note that if a topic is used - * for ordered messages, then it cannot be reused for non-ordered messages. + * for ordered messages, then it cannot be reused for non-ordered messages. Note that local listeners + * are always executed in public thread pool, no matter default or {@link #withAsync()} mode is used. *

* The {@code timeout} parameter specifies how long an out-of-order message will stay in a queue, * waiting for messages that are ordered ahead of it to arrive. If timeout expires, then all ordered http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java index 2800777..541fad4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java @@ -86,7 +86,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter if (snapshot.isEmpty()) throw U.emptyTopologyException(); - ctx.io().sendUserMessage(snapshot, msg, topic, false, 0); + ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync()); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -111,7 +111,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter for (Object msg : msgs) { A.notNull(msg, "msg"); - ctx.io().sendUserMessage(snapshot, msg, topic, false, 0); + ctx.io().sendUserMessage(snapshot, msg, topic, false, 0, isAsync()); } } catch (IgniteCheckedException e) { @@ -137,7 +137,7 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter if (timeout == 0) timeout = ctx.config().getNetworkTimeout(); - ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout); + ctx.io().sendUserMessage(snapshot, msg, topic, true, timeout, false); } catch (IgniteCheckedException e) { throw U.convertException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 584cc56..5992eda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -390,7 +390,7 @@ public abstract class GridManagerAdapter implements GridMan if (msg instanceof Message) ctx.io().send(node, topic, (Message)msg, SYSTEM_POOL); else - ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0); + ctx.io().sendUserMessage(Collections.singletonList(node), msg, topic, false, 0, false); } catch (IgniteCheckedException e) { throw unwrapException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 7ef7bc0..84b4543 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -785,7 +785,8 @@ public class GridIoManager extends GridManagerAdapter ackC + IgniteInClosure ackC, + boolean async ) throws IgniteCheckedException { assert node != null; assert topic != null; @@ -1266,6 +1269,11 @@ public class GridIoManager extends GridManagerAdapter 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null, false); } /** @@ -1409,7 +1418,7 @@ public class GridIoManager extends GridManagerAdapter ackC) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackC, false); } /** @@ -1458,7 +1467,7 @@ public class GridIoManager extends GridManagerAdapter ackC) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, 0, false, ackC); + send(node, topic, -1, msg, plc, false, 0, false, ackC, false); } /** @@ -1514,10 +1523,10 @@ public class GridIoManager extends GridManagerAdapter 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackC, false); } - /** + /** * Sends a peer deployable user message. * * @param nodes Destination nodes. @@ -1525,7 +1534,7 @@ public class GridIoManager extends GridManagerAdapter nodes, Object msg) throws IgniteCheckedException { - sendUserMessage(nodes, msg, null, false, 0); + sendUserMessage(nodes, msg, null, false, 0, false); } /** @@ -1536,11 +1545,12 @@ public class GridIoManager extends GridManagerAdapter nodes, Object msg, - @Nullable Object topic, boolean ordered, long timeout) throws IgniteCheckedException { + @Nullable Object topic, boolean ordered, long timeout, boolean async) throws IgniteCheckedException { boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(locNodeId); byte[] serMsg = null; @@ -1585,7 +1595,7 @@ public class GridIoManager extends GridManagerAdapter() { @@ -1059,9 +1069,15 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser Assert.assertNull(id); - IgniteFuture fut = msg.future(); + IgniteFuture starFut = msg.future(); + + Assert.assertNotNull(starFut); + + U.sleep(500); - Assert.assertNotNull(fut); + Assert.assertFalse(starFut.isDone()); + + discoSpi.stopBlock(); GridTestUtils.assertThrows(log, new Callable() { @Override public Void call() throws Exception { @@ -1071,10 +1087,14 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } }, IllegalStateException.class, null); - id = fut.get(); + id = starFut.get(); Assert.assertNotNull(id); + Assert.assertTrue(starFut.isDone()); + + discoSpi.blockCustomEvent(); + message(ignite1.cluster().forRemotes()).send(topic, "msg1"); GridTestUtils.waitForCondition(new PA() { @@ -1099,8 +1119,16 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } }, IllegalStateException.class, null); + U.sleep(500); + + Assert.assertFalse(stopFut.isDone()); + + discoSpi.stopBlock(); + stopFut.get(); + Assert.assertTrue(stopFut.isDone()); + message(ignite1.cluster().forRemotes()).send(topic, "msg2"); U.sleep(1000); @@ -1109,6 +1137,80 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest implements Ser } /** + * + */ + static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + private boolean blockCustomEvt; + + /** */ + private final Object mux = new Object(); + + /** */ + private List blockedMsgs = new ArrayList<>(); + + /** {@inheritDoc} */ + @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + synchronized (mux) { + if (blockCustomEvt) { + DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate"); + if (msg0 instanceof StopRoutineDiscoveryMessage || msg0 instanceof StartRoutineDiscoveryMessage) { + log.info("Block custom message: " + msg0); + blockedMsgs.add(msg); + + mux.notifyAll(); + } + return; + } + } + + super.sendCustomEvent(msg); + } + + /** + * + */ + public void blockCustomEvent() { + synchronized (mux) { + assert blockedMsgs.isEmpty() : blockedMsgs; + + blockCustomEvt = true; + } + } + + /** + * @throws InterruptedException If interrupted. + */ + public void waitCustomEvent() throws InterruptedException { + synchronized (mux) { + while (blockedMsgs.isEmpty()) + mux.wait(); + } + } + + /** + * + */ + public void stopBlock() { + List msgs; + + synchronized (this) { + msgs = new ArrayList<>(blockedMsgs); + + blockCustomEvt = false; + + blockedMsgs.clear(); + } + + for (DiscoverySpiCustomMessage msg : msgs) { + log.info("Resend blocked message: " + msg); + + super.sendCustomEvent(msg); + } + } + } + + /** * Tests that message listener registers only for one oldest node. * * @throws Exception If an error occurred. http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java new file mode 100644 index 0000000..75e7d22 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.messaging; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteMessaging; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jsr166.ThreadLocalRandom8; +import org.junit.Assert; + +/** + * + */ +public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest implements Serializable { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Threads number for multi-thread tests. */ + private static final int THREADS = 10; + + /** */ + private final String TOPIC = "topic"; + + /** */ + private final String msgStr = "message"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * Checks if use default mode, local listeners execute in the same thread, 1 node in topology. + * + * @throws Exception If failed. + */ + public void testSendDefaultMode() throws Exception { + Ignite ignite1 = startGrid(1); + + send(ignite1.message(), msgStr, new IgniteBiInClosure () { + @Override public void apply(String msg, Thread thread) { + Assert.assertEquals(Thread.currentThread(), thread); + Assert.assertEquals(msgStr, msg); + } + }); + } + + /** + * Checks if use async mode, local listeners execute in another thread, 1 node in topology. + * + * @throws Exception If failed. + */ + public void testSendAsyncMode() throws Exception { + Ignite ignite1 = startGrid(1); + + send(ignite1.message().withAsync(), msgStr, new IgniteBiInClosure () { + @Override public void apply(String msg, Thread thread) { + Assert.assertTrue(!Thread.currentThread().equals(thread)); + Assert.assertEquals(msgStr, msg); + } + }); + } + + /** + * Checks if use default mode, local listeners execute in the same thread, 2 nodes in topology. + * + * @throws Exception If failed. + */ + public void testSendDefaultMode2Nodes() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + sendWith2Nodes(ignite2, ignite1.message(), msgStr, new IgniteBiInClosure () { + @Override public void apply(String msg, Thread thread) { + Assert.assertEquals(Thread.currentThread(), thread); + Assert.assertEquals(msgStr, msg); + } + }); + } + + /** + * Checks if use async mode, local listeners execute in another thread, 2 nodes in topology. + * + * @throws Exception If failed. + */ + public void testSendAsyncMode2Node() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + sendWith2Nodes(ignite2, ignite1.message().withAsync(), msgStr, new IgniteBiInClosure () { + @Override public void apply(String msg, Thread thread) { + Assert.assertTrue(!Thread.currentThread().equals(thread)); + Assert.assertEquals(msgStr, msg); + } + }); + } + + /** + * Checks that sendOrdered works in thread pool, 1 node in topology. + * + * @throws Exception If failed. + */ + public void testSendOrderedDefaultMode() throws Exception { + Ignite ignite1 = startGrid(1); + + final List msgs = orderedMessages(); + + sendOrdered(ignite1.message(), msgs, new IgniteBiInClosure< List, List> () { + @Override public void apply(List received, List threads) { + assertFalse(threads.contains(Thread.currentThread())); + assertTrue(msgs.equals(received)); + } + }); + } + + /** + * Checks that sendOrdered work in thread pool, 1 node in topology. + * + * @throws Exception If failed. + */ + public void testSendOrderedAsyncMode() throws Exception { + Ignite ignite1 = startGrid(1); + + final List msgs = orderedMessages(); + + sendOrdered(ignite1.message().withAsync(), msgs, new IgniteBiInClosure< List, List> () { + @Override public void apply(List received, List threads) { + assertFalse(threads.contains(Thread.currentThread())); + assertTrue(msgs.equals(received)); + } + }); + } + + /** + * Checks that sendOrdered work in thread pool, 2 nodes in topology. + * + * @throws Exception If failed. + */ + public void testSendOrderedDefaultMode2Node() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + final List msgs = orderedMessages(); + + sendOrderedWith2Node(ignite2, ignite1.message(), msgs, new IgniteBiInClosure, List>() { + @Override public void apply(List received, List threads) { + assertFalse(threads.contains(Thread.currentThread())); + assertTrue(msgs.equals(received)); + } + }); + } + + /** + * Checks that sendOrdered work in thread pool, 2 nodes in topology. + * + * @throws Exception If failed. + */ + public void testSendOrderedAsyncMode2Node() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + final List msgs = orderedMessages(); + + sendOrderedWith2Node(ignite2, ignite1.message().withAsync(), msgs, new IgniteBiInClosure, List>() { + @Override public void apply(List received, List threads) { + assertFalse(threads.contains(Thread.currentThread())); + assertTrue(msgs.equals(received)); + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testSendOrderedDefaultModeMultiThreads() throws Exception { + Ignite ignite = startGrid(1); + + sendOrderedMultiThreads(ignite.message()); + } + + /** + * @throws Exception If failed. + */ + public void testSendOrderedAsyncModeMultiThreads() throws Exception { + Ignite ignite = startGrid(1); + + sendOrderedMultiThreads(ignite.message().withAsync()); + } + + /** + * @throws Exception If failed. + */ + public void testSendOrderedDefaultModeMultiThreadsWith2Node() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message()); + } + + /** + * @throws Exception If failed. + */ + public void testSendOrderedAsyncModeMultiThreadsWith2Node() throws Exception { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + sendOrderedMultiThreadsWith2Node(ignite2, ignite1.message().withAsync()); + } + + /** + * @param ignite2 Second node. + * @param ignMsg IgniteMessage. + * @throws Exception If failed. + */ + private void sendOrderedMultiThreadsWith2Node( + final Ignite ignite2, + final IgniteMessaging ignMsg + ) throws Exception { + final ConcurrentMap> expMsg = Maps.newConcurrentMap(); + final ConcurrentMap> actlMsg = Maps.newConcurrentMap(); + + final List msgs = orderedMessages(); + + sendOrderedMultiThreadsWith2Node(ignite2, ignMsg, expMsg, actlMsg, msgs); + + } + + /** + * @param ignMsg IgniteMessaging. + * @throws Exception If failed. + */ + private void sendOrderedMultiThreads( + final IgniteMessaging ignMsg + ) throws Exception { + final ConcurrentMap> expMsg = Maps.newConcurrentMap(); + final ConcurrentMap> actlMsg = Maps.newConcurrentMap(); + + final List msgs = orderedMessages(); + + sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs); + } + + /** + * @param ignite2 Second node. + * @param ignMsg Ignite for send message. + * @param expMsg Expected messages map. + * @param actlMsg Actual message map. + * @param msgs List of messages. + * @throws Exception If failed. + */ + private void sendOrderedMultiThreadsWith2Node( + final Ignite ignite2, + final IgniteMessaging ignMsg, + final ConcurrentMap> expMsg, + final ConcurrentMap> actlMsg, + final List msgs + ) throws Exception { + final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size()); + + final ConcurrentMap> actlMsgNode2 = Maps.newConcurrentMap(); + + ignite2.message().localListen(TOPIC, new IgniteBiPredicate() { + @Override public boolean apply(UUID uuid, Message msg) { + actlMsgNode2.putIfAbsent(msg.threadName, Lists.newArrayList()); + + actlMsgNode2.get(msg.threadName).add(msg.msg); + + latch.countDown(); + + return true; + } + }); + + sendOrderedMultiThreads(ignMsg, expMsg, actlMsg, msgs); + + latch.await(); + + assertEquals(expMsg.size(), actlMsgNode2.size()); + + for (Map.Entry> entry : expMsg.entrySet()) + assertTrue(actlMsgNode2.get(entry.getKey()).equals(entry.getValue())); + } + + /** + * @param ignMsg Ignite for send message. + * @param expMsg Expected messages map. + * @param actlMsg Actual message map. + * @param msgs List of messages. + * @throws Exception If failed. + */ + private void sendOrderedMultiThreads( + final IgniteMessaging ignMsg, + final ConcurrentMap> expMsg, + final ConcurrentMap> actlMsg, + final List msgs + ) throws Exception { + final CountDownLatch latch = new CountDownLatch(THREADS * msgs.size()); + + ignMsg.localListen(TOPIC, new IgniteBiPredicate() { + @Override public boolean apply(UUID uuid, Message msg) { + actlMsg.putIfAbsent(msg.threadName, Lists.newArrayList()); + + actlMsg.get(msg.threadName).add(msg.msg); + + latch.countDown(); + + return true; + } + }); + + for (int i = 0; i < THREADS; i++) + new Thread(new Runnable() { + @Override public void run() { + String thdName = Thread.currentThread().getName(); + + List exp = Lists.newArrayList(); + + expMsg.put(thdName, exp); + + for (String msg : msgs) { + exp.add(msg); + + ignMsg.sendOrdered(TOPIC, new Message(thdName, msg), 1000); + } + + } + }).start(); + + latch.await(); + + assertEquals(expMsg.size(), actlMsg.size()); + + for (Map.Entry> entry : expMsg.entrySet()) + assertTrue(actlMsg.get(entry.getKey()).equals(entry.getValue())); + } + + /** + * @param ignite2 Second node. + * @param igniteMsg Ignite message. + * @param msgStr Message string. + * @param cls Callback for compare result. + * @throws Exception If failed. + */ + private void sendWith2Nodes( + final Ignite ignite2, + final IgniteMessaging igniteMsg, + final String msgStr, + final IgniteBiInClosure cls + ) throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + + ignite2.message().localListen(TOPIC, new IgniteBiPredicate() { + @Override public boolean apply(UUID uuid, String msg) { + Assert.assertEquals(msgStr, msg); + + latch.countDown(); + + return true; + } + }); + + send(igniteMsg, msgStr, cls); + + latch.await(); + } + + /** + * @param igniteMsg Ignite messaging. + * @param msgStr Message string. + * @param cls Callback for compare result. + * @throws Exception If failed. + */ + private void send( + final IgniteMessaging igniteMsg, + final String msgStr, + final IgniteBiInClosure cls + ) throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + + final AtomicReference thread = new AtomicReference<>(); + final AtomicReference val = new AtomicReference<>(); + + igniteMsg.localListen(TOPIC, new IgniteBiPredicate() { + @Override public boolean apply(UUID uuid, String msgStr) { + thread.set(Thread.currentThread()); + + val.set(msgStr); + + latch.countDown(); + + return true; + } + }); + + igniteMsg.send(TOPIC, msgStr); + + latch.await(); + + cls.apply(val.get(), thread.get()); + } + + /** + * @param ignite2 Second node. + * @param igniteMsg Ignite message. + * @param msgs messages for send. + * @param cls Callback for compare result. + * @throws Exception If failed. + */ + private void sendOrderedWith2Node( + final Ignite ignite2, + final IgniteMessaging igniteMsg, + final List msgs, + final IgniteBiInClosure, List> cls + ) throws Exception { + final CountDownLatch latch = new CountDownLatch(msgs.size()); + + final List received = Lists.newArrayList(); + + ignite2.message().localListen(TOPIC, new IgniteBiPredicate() { + @Override public boolean apply(UUID uuid, String msg) { + received.add(msg); + + latch.countDown(); + + return true; + } + }); + + sendOrdered(igniteMsg, msgs, cls); + + latch.await(); + + assertTrue(msgs.equals(received)); + } + + /** + * @param igniteMsg Ignite message. + * @param msgs messages for send. + * @param cls Callback for compare result. + * @throws Exception If failed. + */ + private void sendOrdered( + final IgniteMessaging igniteMsg, + final List msgs, + final IgniteBiInClosure,List> cls + ) throws Exception { + final CountDownLatch latch = new CountDownLatch(msgs.size()); + + final List received = Lists.newArrayList(); + final List threads = Lists.newArrayList(); + + for (T msg : msgs) + igniteMsg.sendOrdered(TOPIC, msg, 1000); + + igniteMsg.localListen(TOPIC, new IgniteBiPredicate() { + @Override public boolean apply(UUID uuid, T s) { + received.add(s); + + threads.add(Thread.currentThread()); + + latch.countDown(); + + return true; + } + }); + + latch.await(); + + cls.apply(received, threads); + } + + /** + * @return List of ordered messages + */ + private List orderedMessages() { + final List msgs = Lists.newArrayList(); + + for (int i = 0; i < 1000; i++) + msgs.add(String.valueOf(ThreadLocalRandom8.current().nextInt())); + + return msgs; + } + + /** + * + */ + private static class Message implements Serializable{ + /** Thread name. */ + private final String threadName; + + /** Message. */ + private final String msg; + + /** + * @param threadName Thread name. + * @param msg Message. + */ + private Message(String threadName, String msg) { + this.threadName = threadName; + this.msg = msg; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 9e20d2a..688edf7 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -56,6 +56,7 @@ import org.apache.ignite.marshaller.DynamicProxySerializationMultiJvmSelfTest; import org.apache.ignite.marshaller.MarshallerContextSelfTest; import org.apache.ignite.messaging.GridMessagingNoPeerClassLoadingSelfTest; import org.apache.ignite.messaging.GridMessagingSelfTest; +import org.apache.ignite.messaging.IgniteMessagingSendAsyncTest; import org.apache.ignite.messaging.IgniteMessagingWithClientTest; import org.apache.ignite.plugin.security.SecurityPermissionSetBuilderTest; import org.apache.ignite.spi.GridSpiLocalHostInjectionTest; @@ -101,6 +102,7 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTest(new TestSuite(GridSelfTest.class)); suite.addTest(new TestSuite(ClusterGroupHostsSelfTest.class)); suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class)); + suite.addTest(new TestSuite(IgniteMessagingSendAsyncTest.class)); GridTestUtils.addTestIfNeeded(suite, ClusterGroupSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridMessagingSelfTest.class, ignoredTests); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8f8e0ac/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java index 8ffea8c..3db68c4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java @@ -147,7 +147,7 @@ public class HadoopShuffle extends HadoopComponent { if (msg instanceof Message) ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL); else - ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0); + ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0, false); } /**