Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DA3BD17E0E for ; Mon, 6 Apr 2015 07:30:29 +0000 (UTC) Received: (qmail 346 invoked by uid 500); 6 Apr 2015 07:30:29 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 317 invoked by uid 500); 6 Apr 2015 07:30:29 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 307 invoked by uid 99); 6 Apr 2015 07:30:29 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Apr 2015 07:30:29 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 06 Apr 2015 07:30:06 +0000 Received: (qmail 99210 invoked by uid 99); 6 Apr 2015 07:30:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Apr 2015 07:30:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BE91FE10A5; Mon, 6 Apr 2015 07:30:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Mon, 06 Apr 2015 07:30:17 -0000 Message-Id: <0190b0ddd1514e288ad05d51fa61f721@git.apache.org> In-Reply-To: <66a19487d6ae44e591b369602533bb44@git.apache.org> References: <66a19487d6ae44e591b369602533bb44@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/21] incubator-ignite git commit: ignite-328 IgniteMessaging.remoteListen ignores forOldest() projection X-Virus-Checked: Checked by ClamAV on apache.org ignite-328 IgniteMessaging.remoteListen ignores forOldest() projection Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6572efba Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6572efba Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6572efba Branch: refs/heads/ignite-560 Commit: 6572efba2304756a0abb9a5453d9e1e0899ff865 Parents: 580f051 Author: Andrey Gura Authored: Wed Apr 1 16:41:17 2015 +0300 Committer: agura Committed: Sun Apr 5 23:30:14 2015 +0300 ---------------------------------------------------------------------- .../internal/cluster/ClusterGroupAdapter.java | 3 +- .../ignite/messaging/GridMessagingSelfTest.java | 50 ++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6572efba/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java index c1d31af..0daffcc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java @@ -806,7 +806,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { * @param isOldest Oldest flag. */ private AgeClusterGroup(ClusterGroupAdapter parent, boolean isOldest) { - super(parent.ctx, parent.subjId, parent.p, parent.ids); + super(parent.ctx, parent.subjId, (IgnitePredicate) null); this.isOldest = isOldest; @@ -823,6 +823,7 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable { lastTopVer = ctx.discovery().topologyVersion(); this.node = isOldest ? U.oldest(super.nodes(), null) : U.youngest(super.nodes(), null); + this.p = F.nodeForNodes(node); } finally { unguard(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6572efba/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java index 4c844fc..c033750 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java @@ -67,6 +67,9 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest { /** */ private static final Integer I_TOPIC_2 = 2; + /** Message count. */ + private static AtomicInteger MSG_CNT; + /** */ public static final String EXT_RESOURCE_CLS_NAME = "org.apache.ignite.tests.p2p.TestUserResource"; @@ -161,6 +164,8 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { + MSG_CNT = new AtomicInteger(); + ignite1 = startGrid(1); ignite2 = startGrid(2); } @@ -1088,4 +1093,49 @@ public class GridMessagingSelfTest extends GridCommonAbstractTest { assertEquals(1, msgCnt.get()); } + + /** + * Tests that message listener registers only for one oldest node. + * + * @throws Exception If an error occurred. + */ + public void testRemoteListenForOldest() throws Exception { + remoteListenForOldest(ignite1); + + // Restart oldest node. + stopGrid(1); + + ignite1 = startGrid(1); + + MSG_CNT.set(0); + + // Ignite2 is oldest now. + remoteListenForOldest(ignite2); + } + + /** + * @param expOldestIgnite Expected oldest ignite. + */ + private void remoteListenForOldest(Ignite expOldestIgnite) throws InterruptedException { + ClusterGroup grp = ignite1.cluster().forOldest(); + + assertEquals(1, grp.nodes().size()); + assertEquals(expOldestIgnite.cluster().localNode().id(), grp.node().id()); + + ignite1.message(grp).remoteListen(null, new P2() { + @Override public boolean apply(UUID nodeId, Object msg) { + System.out.println("Received new message [msg=" + msg + ", senderNodeId=" + nodeId + ']'); + + MSG_CNT.incrementAndGet(); + + return true; + } + }); + + ignite1.message().send(null, MSG_1); + + Thread.sleep(3000); + + assertEquals(1, MSG_CNT.get()); + } }