Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 22DC317E0D for ; Wed, 10 Jun 2015 16:27:39 +0000 (UTC) Received: (qmail 23977 invoked by uid 500); 10 Jun 2015 16:27:39 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 23946 invoked by uid 500); 10 Jun 2015 16:27:39 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 23937 invoked by uid 99); 10 Jun 2015 16:27:39 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jun 2015 16:27:38 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 5834EC12D0 for ; Wed, 10 Jun 2015 16:27:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.77 X-Spam-Level: * X-Spam-Status: No, score=1.77 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id kq-_f1kCQeBZ for ; Wed, 10 Jun 2015 16:27:27 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 4B09B27636 for ; Wed, 10 Jun 2015 16:27:25 +0000 (UTC) Received: (qmail 23509 invoked by uid 99); 10 Jun 2015 16:27:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jun 2015 16:27:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 07090E046A; Wed, 10 Jun 2015 16:27:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 10 Jun 2015 16:27:28 -0000 Message-Id: In-Reply-To: <8eb7a652192c4f07b1b9bed4629f3c65@git.apache.org> References: <8eb7a652192c4f07b1b9bed4629f3c65@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/31] incubator-ignite git commit: ignite-471-2: huge merge from sprint-6 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java deleted file mode 100644 index 0c9f2f2..0000000 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java +++ /dev/null @@ -1,700 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.discovery.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.net.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.events.EventType.*; - -/** - * Client-based discovery tests. - */ -public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final AtomicInteger srvIdx = new AtomicInteger(); - - /** */ - private static final AtomicInteger clientIdx = new AtomicInteger(); - - /** */ - private static Collection srvNodeIds; - - /** */ - private static Collection clientNodeIds; - - /** */ - private static int clientsPerSrv; - - /** */ - private static CountDownLatch srvJoinedLatch; - - /** */ - private static CountDownLatch srvLeftLatch; - - /** */ - private static CountDownLatch srvFailedLatch; - - /** */ - private static CountDownLatch clientJoinedLatch; - - /** */ - private static CountDownLatch clientLeftLatch; - - /** */ - private static CountDownLatch clientFailedLatch; - - /** */ - private static CountDownLatch msgLatch; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setLocalHost("127.0.0.1"); - - if (gridName.startsWith("server")) { - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(disco); - } - else if (gridName.startsWith("client")) { - TcpClientDiscoverySpi disco = new TcpClientDiscoverySpi(); - - TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); - - String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()). - get((clientIdx.get() - 1) / clientsPerSrv).toString(); - - if (addr.startsWith("/")) - addr = addr.substring(1); - - ipFinder.setAddresses(Arrays.asList(addr)); - - disco.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(disco); - } - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - Collection addrs = IP_FINDER.getRegisteredAddresses(); - - if (!F.isEmpty(addrs)) - IP_FINDER.unregisterAddresses(addrs); - - srvIdx.set(0); - clientIdx.set(0); - - srvNodeIds = new GridConcurrentHashSet<>(); - clientNodeIds = new GridConcurrentHashSet<>(); - - clientsPerSrv = 2; - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllClients(true); - stopAllServers(true); - - assert G.allGrids().isEmpty(); - } - - /** - * @throws Exception If failed. - */ - public void testClientNodeJoin() throws Exception { - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - srvJoinedLatch = new CountDownLatch(3); - clientJoinedLatch = new CountDownLatch(3); - - attachListeners(3, 3); - - startClientNodes(1); - - await(srvJoinedLatch); - await(clientJoinedLatch); - - checkNodes(3, 4); - } - - /** - * @throws Exception If failed. - */ - public void testClientNodeLeave() throws Exception { - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - srvLeftLatch = new CountDownLatch(3); - clientLeftLatch = new CountDownLatch(2); - - attachListeners(3, 3); - - stopGrid("client-2"); - - await(srvLeftLatch); - await(clientLeftLatch); - - checkNodes(3, 2); - } - - /** - * @throws Exception If failed. - */ - public void testClientNodeFail() throws Exception { - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - srvFailedLatch = new CountDownLatch(3); - clientFailedLatch = new CountDownLatch(2); - - attachListeners(3, 3); - - failClient(2); - - await(srvFailedLatch); - await(clientFailedLatch); - - checkNodes(3, 2); - } - - /** - * @throws Exception If failed. - */ - public void testServerNodeJoin() throws Exception { - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - srvJoinedLatch = new CountDownLatch(3); - clientJoinedLatch = new CountDownLatch(3); - - attachListeners(3, 3); - - startServerNodes(1); - - await(srvJoinedLatch); - await(clientJoinedLatch); - - checkNodes(4, 3); - } - - /** - * @throws Exception If failed. - */ - public void testServerNodeLeave() throws Exception { - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - srvLeftLatch = new CountDownLatch(2); - clientLeftLatch = new CountDownLatch(3); - - attachListeners(3, 3); - - stopGrid("server-2"); - - await(srvLeftLatch); - await(clientLeftLatch); - - checkNodes(2, 3); - } - - /** - * @throws Exception If failed. - */ - public void testServerNodeFail() throws Exception { - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - srvFailedLatch = new CountDownLatch(2); - clientFailedLatch = new CountDownLatch(3); - - attachListeners(3, 3); - - assert U.field(G.ignite("server-2").configuration().getDiscoverySpi(), "clientMsgWorkers").isEmpty(); - - failServer(2); - - await(srvFailedLatch); - await(clientFailedLatch); - - checkNodes(2, 3); - } - - /** - * TODO: IGNITE-587. - * @throws Exception If failed. - */ - public void testClientReconnect() throws Exception { - fail("ignite-587"); - - clientsPerSrv = 1; - - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - resetClientIpFinder(2); - - srvFailedLatch = new CountDownLatch(2); - clientFailedLatch = new CountDownLatch(3); - - attachListeners(2, 3); - - failServer(2); - - await(srvFailedLatch); - await(clientFailedLatch); - - checkNodes(2, 3); - } - - /** - * @throws Exception If failed. - */ - public void testClientNodeJoinOneServer() throws Exception { - startServerNodes(1); - - srvJoinedLatch = new CountDownLatch(1); - - attachListeners(1, 0); - - startClientNodes(1); - - await(srvJoinedLatch); - - checkNodes(1, 1); - } - - /** - * TODO: IGNITE-587. - * @throws Exception If failed. - */ - public void testClientNodeLeaveOneServer() throws Exception { - fail("ignite-587"); - - startServerNodes(1); - startClientNodes(1); - - checkNodes(1, 1); - - srvLeftLatch = new CountDownLatch(1); - - attachListeners(1, 0); - - stopGrid("client-0"); - - await(srvLeftLatch); - - checkNodes(1, 0); - } - - /** - * @throws Exception If failed. - */ - public void testClientNodeFailOneServer() throws Exception { - startServerNodes(1); - startClientNodes(1); - - checkNodes(1, 1); - - srvFailedLatch = new CountDownLatch(1); - - attachListeners(1, 0); - - failClient(0); - - await(srvFailedLatch); - - checkNodes(1, 0); - } - - /** - * @throws Exception If failed. - */ - public void testMetrics() throws Exception { - startServerNodes(3); - startClientNodes(3); - - checkNodes(3, 3); - - attachListeners(3, 3); - - assertTrue(checkMetrics(3, 3, 0)); - - G.ignite("client-0").compute().broadcast(F.noop()); - - assertTrue(GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return checkMetrics(3, 3, 1); - } - }, 10000)); - - checkMetrics(3, 3, 1); - - G.ignite("server-0").compute().broadcast(F.noop()); - - assertTrue(GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return checkMetrics(3, 3, 2); - } - }, 10000)); - } - - /** - * @param srvCnt Number of Number of server nodes. - * @param clientCnt Number of client nodes. - * @param execJobsCnt Expected number of executed jobs. - * @return Whether metrics are correct. - */ - private boolean checkMetrics(int srvCnt, int clientCnt, int execJobsCnt) { - for (int i = 0; i < srvCnt; i++) { - Ignite g = G.ignite("server-" + i); - - for (ClusterNode n : g.cluster().nodes()) { - if (n.metrics().getTotalExecutedJobs() != execJobsCnt) - return false; - } - } - - for (int i = 0; i < clientCnt; i++) { - Ignite g = G.ignite("client-" + i); - - for (ClusterNode n : g.cluster().nodes()) { - if (n.metrics().getTotalExecutedJobs() != execJobsCnt) - return false; - } - } - - return true; - } - - /** - * @throws Exception If failed. - */ - public void testDataExchangeFromServer() throws Exception { - testDataExchange("server-0"); - } - - /** - * TODO: IGNITE-587. - * - * @throws Exception If failed. - */ - public void testDataExchangeFromClient() throws Exception { - fail("ignite-587"); - - testDataExchange("client-0"); - } - - /** - * @throws Exception If failed. - */ - private void testDataExchange(String masterName) throws Exception { - startServerNodes(2); - startClientNodes(2); - - checkNodes(2, 2); - - IgniteMessaging msg = grid(masterName).message(); - - UUID id = null; - - try { - id = msg.remoteListen(null, new MessageListener()); - - msgLatch = new CountDownLatch(4); - - msg.send(null, "Message 1"); - - await(msgLatch); - - startServerNodes(1); - startClientNodes(1); - - checkNodes(3, 3); - - msgLatch = new CountDownLatch(6); - - msg.send(null, "Message 2"); - - await(msgLatch); - } - finally { - if (id != null) - msg.stopRemoteListen(id); - } - } - - /** - * @param idx Index. - * @throws Exception In case of error. - */ - private void resetClientIpFinder(int idx) throws Exception { - TcpClientDiscoverySpi disco = - (TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi(); - - TcpDiscoveryVmIpFinder ipFinder = (TcpDiscoveryVmIpFinder)disco.getIpFinder(); - - String addr = IP_FINDER.getRegisteredAddresses().iterator().next().toString(); - - if (addr.startsWith("/")) - addr = addr.substring(1); - - ipFinder.setAddresses(Arrays.asList(addr)); - } - - /** - * @param cnt Number of nodes. - * @throws Exception In case of error. - */ - private void startServerNodes(int cnt) throws Exception { - for (int i = 0; i < cnt; i++) { - Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); - - srvNodeIds.add(g.cluster().localNode().id()); - } - } - - /** - * @param cnt Number of nodes. - * @throws Exception In case of error. - */ - private void startClientNodes(int cnt) throws Exception { - for (int i = 0; i < cnt; i++) { - Ignite g = startGrid("client-" + clientIdx.getAndIncrement()); - - clientNodeIds.add(g.cluster().localNode().id()); - } - } - - /** - * @param idx Index. - */ - private void failServer(int idx) { - ((TcpDiscoverySpi)G.ignite("server-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure(); - } - - /** - * @param idx Index. - */ - private void failClient(int idx) { - ((TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure(); - } - - /** - * @param srvCnt Number of server nodes. - * @param clientCnt Number of client nodes. - */ - private void attachListeners(int srvCnt, int clientCnt) throws Exception { - if (srvJoinedLatch != null) { - for (int i = 0; i < srvCnt; i++) { - G.ignite("server-" + i).events().localListen(new IgnitePredicate() { - @Override public boolean apply(Event evt) { - info("Joined event fired on server: " + evt); - - srvJoinedLatch.countDown(); - - return true; - } - }, EVT_NODE_JOINED); - } - } - - if (srvLeftLatch != null) { - for (int i = 0; i < srvCnt; i++) { - G.ignite("server-" + i).events().localListen(new IgnitePredicate() { - @Override public boolean apply(Event evt) { - info("Left event fired on server: " + evt); - - srvLeftLatch.countDown(); - - return true; - } - }, EVT_NODE_LEFT); - } - } - - if (srvFailedLatch != null) { - for (int i = 0; i < srvCnt; i++) { - G.ignite("server-" + i).events().localListen(new IgnitePredicate() { - @Override public boolean apply(Event evt) { - info("Failed event fired on server: " + evt); - - srvFailedLatch.countDown(); - - return true; - } - }, EVT_NODE_FAILED); - } - } - - if (clientJoinedLatch != null) { - for (int i = 0; i < clientCnt; i++) { - G.ignite("client-" + i).events().localListen(new IgnitePredicate() { - @Override public boolean apply(Event evt) { - info("Joined event fired on client: " + evt); - - clientJoinedLatch.countDown(); - - return true; - } - }, EVT_NODE_JOINED); - } - } - - if (clientLeftLatch != null) { - for (int i = 0; i < clientCnt; i++) { - G.ignite("client-" + i).events().localListen(new IgnitePredicate() { - @Override public boolean apply(Event evt) { - info("Left event fired on client: " + evt); - - clientLeftLatch.countDown(); - - return true; - } - }, EVT_NODE_LEFT); - } - } - - if (clientFailedLatch != null) { - for (int i = 0; i < clientCnt; i++) { - G.ignite("client-" + i).events().localListen(new IgnitePredicate() { - @Override public boolean apply(Event evt) { - info("Failed event fired on client: " + evt); - - clientFailedLatch.countDown(); - - return true; - } - }, EVT_NODE_FAILED); - } - } - } - - /** - * @param srvCnt Number of server nodes. - * @param clientCnt Number of client nodes. - */ - private void checkNodes(int srvCnt, int clientCnt) { - for (int i = 0; i < srvCnt; i++) { - Ignite g = G.ignite("server-" + i); - - assertTrue(srvNodeIds.contains(g.cluster().localNode().id())); - - assertFalse(g.cluster().localNode().isClient()); - - checkRemoteNodes(g, srvCnt + clientCnt - 1); - } - - for (int i = 0; i < clientCnt; i++) { - Ignite g = G.ignite("client-" + i); - - assertTrue(clientNodeIds.contains(g.cluster().localNode().id())); - - assertTrue(g.cluster().localNode().isClient()); - - checkRemoteNodes(g, srvCnt + clientCnt - 1); - } - } - - /** - * @param ignite Grid. - * @param expCnt Expected nodes count. - */ - @SuppressWarnings("TypeMayBeWeakened") - private void checkRemoteNodes(Ignite ignite, int expCnt) { - Collection nodes = ignite.cluster().forRemotes().nodes(); - - assertEquals(expCnt, nodes.size()); - - for (ClusterNode node : nodes) { - UUID id = node.id(); - - if (clientNodeIds.contains(id)) - assertTrue(node.isClient()); - else if (srvNodeIds.contains(id)) - assertFalse(node.isClient()); - else - assert false : "Unexpected node ID: " + id; - } - } - - /** - * @param latch Latch. - * @throws InterruptedException If interrupted. - */ - private void await(CountDownLatch latch) throws InterruptedException { - assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS)); - } - - /** - */ - private static class MessageListener implements IgniteBiPredicate { - @IgniteInstanceResource - private Ignite ignite; - - /** {@inheritDoc} */ - @Override public boolean apply(UUID uuid, Object msg) { - X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']'); - - msgLatch.countDown(); - - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java new file mode 100644 index 0000000..d1b6232 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiMulticastTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +/** + * + */ +public class TcpClientDiscoverySpiMulticastTest extends GridCommonAbstractTest { + /** */ + private boolean forceSrv; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setLocalHost("127.0.0.1"); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(new TcpDiscoveryMulticastIpFinder()); + + if (getTestGridName(1).equals(gridName)) { + cfg.setClientMode(true); + + spi.setForceServerMode(forceSrv); + } + + cfg.setDiscoverySpi(spi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + /** + * @throws Exception If failed. + */ + public void testJoinWithMulticast() throws Exception { + joinWithMulticast(); + } + + /** + * @throws Exception If failed. + */ + public void testJoinWithMulticastForceServer() throws Exception { + forceSrv = true; + + joinWithMulticast(); + } + + /** + * @throws Exception If failed. + */ + private void joinWithMulticast() throws Exception { + Ignite ignite0 = startGrid(0); + + assertSpi(ignite0, false); + + Ignite ignite1 = startGrid(1); + + assertSpi(ignite1, !forceSrv); + + assertTrue(ignite1.configuration().isClientMode()); + + assertEquals(2, ignite0.cluster().nodes().size()); + assertEquals(2, ignite1.cluster().nodes().size()); + + Ignite ignite2 = startGrid(2); + + assertSpi(ignite2, false); + + assertEquals(3, ignite0.cluster().nodes().size()); + assertEquals(3, ignite1.cluster().nodes().size()); + assertEquals(3, ignite2.cluster().nodes().size()); + } + + /** + * @param ignite Ignite. + * @param client Expected client mode flag. + */ + private void assertSpi(Ignite ignite, boolean client) { + DiscoverySpi spi = ignite.configuration().getDiscoverySpi(); + + assertSame(TcpDiscoverySpi.class, spi.getClass()); + + TcpDiscoverySpi spi0 = (TcpDiscoverySpi)spi; + + assertSame(TcpDiscoveryMulticastIpFinder.class, spi0.getIpFinder().getClass()); + + assertEquals(client, spi0.isClientMode()); + + Collection addrSnds = GridTestUtils.getFieldValue(spi0.getIpFinder(), "addrSnds"); + + assertNotNull(addrSnds); + + if (client) + assertTrue(addrSnds.isEmpty()); // Check client does not send its address. + else + assertFalse(addrSnds.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java new file mode 100644 index 0000000..7333020 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -0,0 +1,1196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.discovery.tcp.messages.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.EventType.*; + +/** + * Client-based discovery tests. + */ +public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final AtomicInteger srvIdx = new AtomicInteger(); + + /** */ + private static final AtomicInteger clientIdx = new AtomicInteger(); + + /** */ + private static Collection srvNodeIds; + + /** */ + private static Collection clientNodeIds; + + /** */ + private static int clientsPerSrv; + + /** */ + private static CountDownLatch srvJoinedLatch; + + /** */ + private static CountDownLatch srvLeftLatch; + + /** */ + private static CountDownLatch srvFailedLatch; + + /** */ + private static CountDownLatch clientJoinedLatch; + + /** */ + private static CountDownLatch clientLeftLatch; + + /** */ + private static CountDownLatch clientFailedLatch; + + /** */ + private static CountDownLatch msgLatch; + + /** */ + private UUID nodeId; + + /** */ + private TcpDiscoveryVmIpFinder clientIpFinder; + + /** */ + private long joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT; + + /** */ + private long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT; + + /** */ + private boolean longSockTimeouts; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TestTcpDiscoverySpi(); + + if (gridName.startsWith("server")) + disco.setIpFinder(IP_FINDER); + else if (gridName.startsWith("client")) { + cfg.setClientMode(true); + + TcpDiscoveryVmIpFinder ipFinder; + + if (clientIpFinder != null) + ipFinder = clientIpFinder; + else { + ipFinder = new TcpDiscoveryVmIpFinder(); + + String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()). + get((clientIdx.get() - 1) / clientsPerSrv).toString(); + + if (addr.startsWith("/")) + addr = addr.substring(1); + + ipFinder.setAddresses(Collections.singletonList(addr)); + } + + disco.setIpFinder(ipFinder); + + String nodeId = cfg.getNodeId().toString(); + + nodeId = "cc" + nodeId.substring(2); + + cfg.setNodeId(UUID.fromString(nodeId)); + } + else + throw new IllegalArgumentException(); + + if (longSockTimeouts) { + disco.setAckTimeout(2000); + disco.setSocketTimeout(2000); + } + + disco.setJoinTimeout(joinTimeout); + disco.setNetworkTimeout(netTimeout); + + cfg.setDiscoverySpi(disco); + + if (nodeId != null) + cfg.setNodeId(nodeId); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + Collection addrs = IP_FINDER.getRegisteredAddresses(); + + if (!F.isEmpty(addrs)) + IP_FINDER.unregisterAddresses(addrs); + + srvIdx.set(0); + clientIdx.set(0); + + srvNodeIds = new GridConcurrentHashSet<>(); + clientNodeIds = new GridConcurrentHashSet<>(); + + clientsPerSrv = 2; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllClients(true); + stopAllServers(true); + + nodeId = null; + clientIpFinder = null; + joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT; + netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT; + longSockTimeouts = false; + + assert G.allGrids().isEmpty(); + } + + /** + * + * @throws Exception + */ + public void testJoinTimeout() throws Exception { + clientIpFinder = new TcpDiscoveryVmIpFinder(); + joinTimeout = 1000; + + try { + startClientNodes(1); + + fail("Client cannot be start because no server nodes run"); + } + catch (IgniteCheckedException e) { + IgniteSpiException spiEx = e.getCause(IgniteSpiException.class); + + assert spiEx != null : e; + + assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage(); + } + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeJoin() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvJoinedLatch = new CountDownLatch(3); + clientJoinedLatch = new CountDownLatch(3); + + attachListeners(3, 3); + + startClientNodes(1); + + await(srvJoinedLatch); + await(clientJoinedLatch); + + checkNodes(3, 4); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeLeave() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvLeftLatch = new CountDownLatch(3); + clientLeftLatch = new CountDownLatch(2); + + attachListeners(3, 3); + + stopGrid("client-2"); + + await(srvLeftLatch); + await(clientLeftLatch); + + checkNodes(3, 2); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeFail() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvFailedLatch = new CountDownLatch(3); + clientFailedLatch = new CountDownLatch(2); + + attachListeners(3, 3); + + failClient(2); + + await(srvFailedLatch); + await(clientFailedLatch); + + checkNodes(3, 2); + } + + /** + * @throws Exception If failed. + */ + public void testServerNodeJoin() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvJoinedLatch = new CountDownLatch(3); + clientJoinedLatch = new CountDownLatch(3); + + attachListeners(3, 3); + + startServerNodes(1); + + await(srvJoinedLatch); + await(clientJoinedLatch); + + checkNodes(4, 3); + } + + /** + * @throws Exception If failed. + */ + public void testServerNodeLeave() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvLeftLatch = new CountDownLatch(2); + clientLeftLatch = new CountDownLatch(3); + + attachListeners(3, 3); + + stopGrid("server-2"); + + await(srvLeftLatch); + await(clientLeftLatch); + + checkNodes(2, 3); + } + + /** + * @throws Exception If failed. + */ + public void testServerNodeFail() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvFailedLatch = new CountDownLatch(2); + clientFailedLatch = new CountDownLatch(3); + + attachListeners(3, 3); + + assert ((TcpDiscoverySpi)G.ignite("server-2").configuration().getDiscoverySpi()).clientWorkerCount() == 0; + + failServer(2); + + await(srvFailedLatch); + await(clientFailedLatch); + + checkNodes(2, 3); + } + + /** + * @throws Exception If failed. + */ + public void testPing() throws Exception { + startServerNodes(2); + startClientNodes(1); + + Ignite srv0 = G.ignite("server-0"); + Ignite srv1 = G.ignite("server-1"); + Ignite client = G.ignite("client-0"); + + assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id()); + assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id()); + + assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id()); + assert ((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id()); + } + + /** + * @throws Exception If failed. + */ + public void testPingFailedNodeFromClient() throws Exception { + startServerNodes(2); + startClientNodes(1); + + Ignite srv0 = G.ignite("server-0"); + Ignite srv1 = G.ignite("server-1"); + Ignite client = G.ignite("client-0"); + + final CountDownLatch latch = new CountDownLatch(1); + + ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure() { + @Override public void apply(Socket sock) { + try { + latch.await(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + + assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id()); + assert !((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id()); + + latch.countDown(); + } + + /** + * @throws Exception If failed. + */ + public void testPingFailedClientNode() throws Exception { + startServerNodes(2); + startClientNodes(1); + + Ignite srv0 = G.ignite("server-0"); + Ignite srv1 = G.ignite("server-1"); + Ignite client = G.ignite("client-0"); + + ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000); + + ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()).pauseSocketWrite(); + + assert !((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id()); + assert !((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id()); + + ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()).resumeAll(); + + assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id()); + assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id()); + } + + /** + * @throws Exception If failed. + */ + public void testClientReconnectOnRouterFail() throws Exception { + clientsPerSrv = 1; + + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + setClientRouter(2, 0); + + srvFailedLatch = new CountDownLatch(2); + clientFailedLatch = new CountDownLatch(3); + + attachListeners(2, 3); + + failServer(2); + + await(srvFailedLatch); + await(clientFailedLatch); + + checkNodes(2, 3); + } + + /** + * @throws Exception If failed. + */ + public void testClientReconnectOnNetworkProblem() throws Exception { + clientsPerSrv = 1; + + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + setClientRouter(2, 0); + + srvFailedLatch = new CountDownLatch(2); + clientFailedLatch = new CountDownLatch(3); + + attachListeners(2, 3); + + ((TcpDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi()).brakeConnection(); + + G.ignite("client-2").message().remoteListen(null, new MessageListener()); // Send some discovery message. + + checkNodes(3, 3); + } + + /** + * @throws Exception If failed. + */ + public void testClientReconnectOneServerOneClient() throws Exception { + clientsPerSrv = 1; + + startServerNodes(1); + startClientNodes(1); + + checkNodes(1, 1); + + srvLeftLatch = new CountDownLatch(1); + srvFailedLatch = new CountDownLatch(1); + + attachListeners(1, 0); + + ((TcpDiscoverySpi)G.ignite("client-0").configuration().getDiscoverySpi()).brakeConnection(); + + assertFalse(srvFailedLatch.await(2000, TimeUnit.MILLISECONDS)); + + assertEquals(1L, srvLeftLatch.getCount()); + + checkNodes(1, 1); + } + + /** + * @throws Exception If failed. + */ + public void testGetMissedMessagesOnReconnect() throws Exception { + clientsPerSrv = 1; + + startServerNodes(3); + startClientNodes(2); + + checkNodes(3, 2); + + clientLeftLatch = new CountDownLatch(1); + srvLeftLatch = new CountDownLatch(2); + + attachListeners(2, 2); + + ((TestTcpDiscoverySpi)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll(); + + stopGrid("server-2"); + + await(srvLeftLatch); + await(srvLeftLatch); + + Thread.sleep(500); + + assert G.ignite("client-0").cluster().nodes().size() == 4; + assert G.ignite("client-1").cluster().nodes().size() == 5; + + clientLeftLatch = new CountDownLatch(1); + + ((TestTcpDiscoverySpi)G.ignite("client-1").configuration().getDiscoverySpi()).resumeAll(); + + await(clientLeftLatch); + + checkNodes(2, 2); + } + + /** + * @throws Exception If failed. + */ + public void testClientSegmentation() throws Exception { + clientsPerSrv = 1; + + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + srvFailedLatch = new CountDownLatch(2 + 2); + clientFailedLatch = new CountDownLatch(2 + 2); + + attachListeners(2, 2); + + final CountDownLatch client2StoppedLatch = new CountDownLatch(1); + + IgnitionListener lsnr = new IgnitionListener() { + @Override public void onStateChange(@Nullable String name, IgniteState state) { + if (state == IgniteState.STOPPED_ON_SEGMENTATION) + client2StoppedLatch.countDown(); + } + }; + G.addListener(lsnr); + + final TcpDiscoverySpi disco = (TcpDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi(); + + try { + failServer(2); + + await(srvFailedLatch); + await(clientFailedLatch); + + await(client2StoppedLatch); + + checkNodes(2, 2); + } + finally { + G.removeListener(lsnr); + } + + assert disco.getRemoteNodes().isEmpty(); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeJoinOneServer() throws Exception { + startServerNodes(1); + + srvJoinedLatch = new CountDownLatch(1); + + attachListeners(1, 0); + + startClientNodes(1); + + await(srvJoinedLatch); + + checkNodes(1, 1); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeLeaveOneServer() throws Exception { + startServerNodes(1); + startClientNodes(1); + + checkNodes(1, 1); + + srvLeftLatch = new CountDownLatch(1); + + attachListeners(1, 0); + + stopGrid("client-0"); + + await(srvLeftLatch); + + checkNodes(1, 0); + } + + /** + * @throws Exception If failed. + */ + public void testClientNodeFailOneServer() throws Exception { + startServerNodes(1); + startClientNodes(1); + + checkNodes(1, 1); + + srvFailedLatch = new CountDownLatch(1); + + attachListeners(1, 0); + + failClient(0); + + await(srvFailedLatch); + + checkNodes(1, 0); + } + + /** + * @throws Exception If failed. + */ + public void testClientAndRouterFail() throws Exception { + startServerNodes(2); + startClientNodes(2); + + checkNodes(2, 2); + + srvFailedLatch = new CountDownLatch(2); + clientFailedLatch = new CountDownLatch(2); + + attachListeners(1, 1); + + ((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener(new IgniteInClosure() { + @Override public void apply(TcpDiscoveryAbstractMessage msg) { + try { + Thread.sleep(1000000); + } + catch (InterruptedException ignored) { + Thread.interrupted(); + } + } + }); + failClient(1); + failServer(1); + + await(srvFailedLatch); + await(clientFailedLatch); + + checkNodes(1, 1); + } + + /** + * @throws Exception If failed. + */ + public void testMetrics() throws Exception { + startServerNodes(3); + startClientNodes(3); + + checkNodes(3, 3); + + attachListeners(3, 3); + + assertTrue(checkMetrics(3, 3, 0)); + + G.ignite("client-0").compute().broadcast(F.noop()); + + assertTrue(GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return checkMetrics(3, 3, 1); + } + }, 10000)); + + checkMetrics(3, 3, 1); + + G.ignite("server-0").compute().broadcast(F.noop()); + + assertTrue(GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return checkMetrics(3, 3, 2); + } + }, 10000)); + } + + /** + * @param srvCnt Number of Number of server nodes. + * @param clientCnt Number of client nodes. + * @param execJobsCnt Expected number of executed jobs. + * @return Whether metrics are correct. + */ + private boolean checkMetrics(int srvCnt, int clientCnt, int execJobsCnt) { + for (int i = 0; i < srvCnt; i++) { + Ignite g = G.ignite("server-" + i); + + for (ClusterNode n : g.cluster().nodes()) { + if (n.metrics().getTotalExecutedJobs() != (n.isClient() ? 0 : execJobsCnt)) + return false; + } + } + + for (int i = 0; i < clientCnt; i++) { + Ignite g = G.ignite("client-" + i); + + for (ClusterNode n : g.cluster().nodes()) { + if (n.metrics().getTotalExecutedJobs() != (n.isClient() ? 0 : execJobsCnt)) + return false; + } + } + + return true; + } + + /** + * @throws Exception If failed. + */ + public void testDataExchangeFromServer() throws Exception { + testDataExchange("server-0"); + } + + /** + * TODO: IGNITE-587. + * + * @throws Exception If failed. + */ + public void testDataExchangeFromClient() throws Exception { + testDataExchange("client-0"); + } + + /** + * @throws Exception If failed. + */ + private void testDataExchange(String masterName) throws Exception { + startServerNodes(2); + startClientNodes(2); + + checkNodes(2, 2); + + IgniteMessaging msg = grid(masterName).message(); + + UUID id = msg.remoteListen(null, new MessageListener()); + + try { + msgLatch = new CountDownLatch(2); + + msg.send(null, "Message 1"); + + await(msgLatch); + + startServerNodes(1); + startClientNodes(1); + + checkNodes(3, 3); + + msgLatch = new CountDownLatch(3); + + msg.send(null, "Message 2"); + + await(msgLatch); + } + finally { + msg.stopRemoteListen(id); + } + } + + /** + * @throws Exception If failed. + */ + public void testDataExchangeFromServer2() throws Exception { + startServerNodes(2); + + IgniteMessaging msg = grid("server-1").message(); + + UUID id = msg.remoteListen(null, new MessageListener()); + + try { + startClientNodes(1); + + assertEquals(G.ignite("server-0").cluster().localNode().id(), ((TcpDiscoveryNode)G.ignite("client-0") + .cluster().localNode()).clientRouterNodeId()); + + checkNodes(2, 1); + + msgLatch = new CountDownLatch(3); + + msg.send(null, "Message"); + + await(msgLatch); + } + finally { + msg.stopRemoteListen(id); + } + } + + + /** + * @throws Exception If any error occurs. + */ + public void testDuplicateId() throws Exception { + startServerNodes(2); + + nodeId = G.ignite("server-1").cluster().localNode().id(); + + try { + startGrid("client-0"); + + assert false; + } + catch (IgniteCheckedException e) { + IgniteSpiException spiEx = e.getCause(IgniteSpiException.class); + + assert spiEx != null : e; + assert spiEx.getMessage().contains("same ID") : spiEx.getMessage(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testTimeoutWaitingNodeAddedMessage() throws Exception { + longSockTimeouts = true; + + startServerNodes(2); + + final CountDownLatch cnt = new CountDownLatch(1); + + ((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener( + new IgniteInClosure() { + @Override public void apply(TcpDiscoveryAbstractMessage msg) { + try { + cnt.await(10, MINUTES); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedException(e); + } + } + }); + + try { + netTimeout = 500; + + startGrid("client-0"); + + assert false; + } + catch (IgniteCheckedException e) { + cnt.countDown(); + + IgniteSpiException spiEx = e.getCause(IgniteSpiException.class); + + assert spiEx != null : e; + assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage(); + } + } + + /** + * @throws Exception If any error occurs. + */ + public void testGridStartTime() throws Exception { + startServerNodes(2); + + startClientNodes(2); + + long startTime = -1; + + for (Ignite g : G.allGrids()) { + IgniteEx kernal = (IgniteEx)g; + + assertTrue(kernal.context().discovery().gridStartTime() > 0); + + if (startTime == -1) + startTime = kernal.context().discovery().gridStartTime(); + else + assertEquals(startTime, kernal.context().discovery().gridStartTime()); + } + } + + /** + * @param clientIdx Index. + * @throws Exception In case of error. + */ + private void setClientRouter(int clientIdx, int srvIdx) throws Exception { + TcpDiscoverySpi disco = + (TcpDiscoverySpi)G.ignite("client-" + clientIdx).configuration().getDiscoverySpi(); + + TcpDiscoveryVmIpFinder ipFinder = (TcpDiscoveryVmIpFinder)disco.getIpFinder(); + + String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).get(srvIdx).toString(); + + if (addr.startsWith("/")) + addr = addr.substring(1); + + ipFinder.setAddresses(Collections.singletonList(addr)); + } + + /** + * @param cnt Number of nodes. + * @throws Exception In case of error. + */ + private void startServerNodes(int cnt) throws Exception { + for (int i = 0; i < cnt; i++) { + Ignite g = startGrid("server-" + srvIdx.getAndIncrement()); + + srvNodeIds.add(g.cluster().localNode().id()); + } + } + + /** + * @param cnt Number of nodes. + * @throws Exception In case of error. + */ + private void startClientNodes(int cnt) throws Exception { + for (int i = 0; i < cnt; i++) { + Ignite g = startGrid("client-" + clientIdx.getAndIncrement()); + + clientNodeIds.add(g.cluster().localNode().id()); + } + } + + /** + * @param idx Index. + */ + private void failServer(int idx) { + ((TcpDiscoverySpi)G.ignite("server-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure(); + } + + /** + * @param idx Index. + */ + private void failClient(int idx) { + ((TcpDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure(); + } + + /** + * @param srvCnt Number of server nodes. + * @param clientCnt Number of client nodes. + */ + private void attachListeners(int srvCnt, int clientCnt) throws Exception { + if (srvJoinedLatch != null) { + for (int i = 0; i < srvCnt; i++) { + G.ignite("server-" + i).events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + info("Joined event fired on server: " + evt); + + srvJoinedLatch.countDown(); + + return true; + } + }, EVT_NODE_JOINED); + } + } + + if (srvLeftLatch != null) { + for (int i = 0; i < srvCnt; i++) { + G.ignite("server-" + i).events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + info("Left event fired on server: " + evt); + + srvLeftLatch.countDown(); + + return true; + } + }, EVT_NODE_LEFT); + } + } + + if (srvFailedLatch != null) { + for (int i = 0; i < srvCnt; i++) { + G.ignite("server-" + i).events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + info("Failed event fired on server: " + evt); + + srvFailedLatch.countDown(); + + return true; + } + }, EVT_NODE_FAILED); + } + } + + if (clientJoinedLatch != null) { + for (int i = 0; i < clientCnt; i++) { + G.ignite("client-" + i).events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + info("Joined event fired on client: " + evt); + + clientJoinedLatch.countDown(); + + return true; + } + }, EVT_NODE_JOINED); + } + } + + if (clientLeftLatch != null) { + for (int i = 0; i < clientCnt; i++) { + G.ignite("client-" + i).events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + info("Left event fired on client: " + evt); + + clientLeftLatch.countDown(); + + return true; + } + }, EVT_NODE_LEFT); + } + } + + if (clientFailedLatch != null) { + for (int i = 0; i < clientCnt; i++) { + G.ignite("client-" + i).events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + info("Failed event fired on client: " + evt); + + clientFailedLatch.countDown(); + + return true; + } + }, EVT_NODE_FAILED); + } + } + } + + /** + * @param srvCnt Number of server nodes. + * @param clientCnt Number of client nodes. + */ + private void checkNodes(int srvCnt, int clientCnt) { + for (int i = 0; i < srvCnt; i++) { + Ignite g = G.ignite("server-" + i); + + assertTrue(srvNodeIds.contains(g.cluster().localNode().id())); + + assertFalse(g.cluster().localNode().isClient()); + + checkRemoteNodes(g, srvCnt + clientCnt - 1); + } + + for (int i = 0; i < clientCnt; i++) { + Ignite g = G.ignite("client-" + i); + + ((TcpDiscoverySpi)g.configuration().getDiscoverySpi()).waitForClientMessagePrecessed(); + + assertTrue(clientNodeIds.contains(g.cluster().localNode().id())); + + assertTrue(g.cluster().localNode().isClient()); + + checkRemoteNodes(g, srvCnt + clientCnt - 1); + } + } + + /** + * @param ignite Grid. + * @param expCnt Expected nodes count. + */ + @SuppressWarnings("TypeMayBeWeakened") + private void checkRemoteNodes(Ignite ignite, int expCnt) { + Collection nodes = ignite.cluster().forRemotes().nodes(); + + assertEquals(expCnt, nodes.size()); + + for (ClusterNode node : nodes) { + UUID id = node.id(); + + if (clientNodeIds.contains(id)) + assertTrue(node.isClient()); + else if (srvNodeIds.contains(id)) + assertFalse(node.isClient()); + else + assert false : "Unexpected node ID: " + id; + } + } + + /** + * @param latch Latch. + * @throws InterruptedException If interrupted. + */ + private void await(CountDownLatch latch) throws InterruptedException { + assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS)); + } + + /** + */ + private static class MessageListener implements IgniteBiPredicate { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, Object msg) { + X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']'); + + msgLatch.countDown(); + + return true; + } + } + + /** + * + */ + private static class TestTcpDiscoverySpi extends TcpDiscoverySpi { + /** */ + private final Object mux = new Object(); + + /** */ + private final AtomicBoolean writeLock = new AtomicBoolean(); + + /** */ + private final AtomicBoolean openSockLock = new AtomicBoolean(); + + /** + * @param lock Lock. + */ + private void waitFor(AtomicBoolean lock) { + try { + synchronized (mux) { + while (lock.get()) + mux.wait(); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new RuntimeException(e); + } + } + + /** + * @param isPause Is lock. + * @param locks Locks. + */ + private void pauseResumeOperation(boolean isPause, AtomicBoolean... locks) { + synchronized (mux) { + for (AtomicBoolean lock : locks) + lock.set(isPause); + + mux.notifyAll(); + } + } + + /** {@inheritDoc} */ + @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, + GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException { + waitFor(writeLock); + + super.writeToSocket(sock, msg, bout); + } + + /** {@inheritDoc} */ + @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException { + waitFor(openSockLock); + + return super.openSocket(sockAddr); + } + + /** + * + */ + public void pauseSocketWrite() { + pauseResumeOperation(true, writeLock); + } + + /** + * + */ + public void pauseAll() { + pauseResumeOperation(true, openSockLock, writeLock); + + impl.workerThread().suspend(); + } + + /** + * + */ + public void resumeAll() { + pauseResumeOperation(false, openSockLock, writeLock); + + impl.workerThread().resume(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryConcurrentStartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryConcurrentStartTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryConcurrentStartTest.java index b5d02f0..6438268 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryConcurrentStartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryConcurrentStartTest.java @@ -20,16 +20,19 @@ package org.apache.ignite.spi.discovery.tcp; import org.apache.ignite.configuration.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; -import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; /** * Test for {@link TcpDiscoverySpi}. */ public class TcpDiscoveryConcurrentStartTest extends GridCommonAbstractTest { /** */ - private static final int TOP_SIZE = 1; + private static final int TOP_SIZE = 3; /** */ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); @@ -41,34 +44,12 @@ public class TcpDiscoveryConcurrentStartTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - if (client) { - TcpDiscoveryVmIpFinder clientIpFinder = new TcpDiscoveryVmIpFinder(); - - String addr = new ArrayList<>(ipFinder.getRegisteredAddresses()).iterator().next().toString(); - - if (addr.startsWith("/")) - addr = addr.substring(1); - - clientIpFinder.setAddresses(Arrays.asList(addr)); - - TcpClientDiscoverySpi discoSpi = new TcpClientDiscoverySpi(); - - discoSpi.setIpFinder(clientIpFinder); - - cfg.setDiscoverySpi(discoSpi); - } - else { - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(discoSpi); - } - - cfg.setLocalHost("127.0.0.1"); + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder)); cfg.setCacheConfiguration(); + cfg.setClientMode(client); + return cfg; } @@ -77,11 +58,16 @@ public class TcpDiscoveryConcurrentStartTest extends GridCommonAbstractTest { return Long.MAX_VALUE; } + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + client = false; + } + /** * @throws Exception If failed. */ public void testConcurrentStart() throws Exception { - for (int i = 0; i < 50; i++) { + for (int i = 0; i < 10; i++) { try { startGridsMultiThreaded(TOP_SIZE); } @@ -95,15 +81,28 @@ public class TcpDiscoveryConcurrentStartTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testConcurrentStartClients() throws Exception { - for (int i = 0; i < 50; i++) { + for (int i = 0; i < 20; i++) { try { client = false; - startGrid(); + startGrid(0); client = true; - startGridsMultiThreaded(TOP_SIZE); + final AtomicInteger gridIdx = new AtomicInteger(1); + + GridTestUtils.runMultiThreaded(new Callable() { + @Nullable @Override public Object call() throws Exception { + startGrid(gridIdx.getAndIncrement()); + + return null; + } + }, + TOP_SIZE, + "grid-starter-" + getName() + ); + + checkTopology(TOP_SIZE + 1); } finally { stopAllGrids(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index a2d8276..cfefff4 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -70,20 +70,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - if (client()) { - TcpClientDiscoverySpi spi = new TcpClientDiscoverySpi(); + if (client()) + cfg.setClientMode(true); - spi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(spi); - } - else { - TcpDiscoverySpi spi = new TcpDiscoverySpi(); - - spi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(spi); - } + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder)); cfg.setCacheConfiguration(); @@ -91,8 +81,6 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { cfg.setIncludeProperties(); - cfg.setLocalHost("127.0.0.1"); - return cfg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 5648c31..ad12753 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -175,7 +175,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } }, 4, "grid-starter"); - Collection nodes = discoMap.get(g1.name()).ring().allNodes(); + Collection nodes = ((ServerImpl)discoMap.get(g1.name()).impl).ring().allNodes(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java index 752e43c..04f9b41 100644 --- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java @@ -58,14 +58,11 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest { /** Port. */ private static int port; - /** Ignite. */ - private static Ignite ignite; - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration() throws Exception { - IgniteConfiguration cfg = super.getConfiguration(); + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); - CacheConfiguration ccfg = cacheConfiguration(cfg, null); + CacheConfiguration ccfg = defaultCacheConfiguration(); cfg.setCacheConfiguration(ccfg); @@ -81,8 +78,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { - ignite = startGrids(GRID_CNT); - ignite.getOrCreateCache(defaultCacheConfiguration()); + startGrids(GRID_CNT); try (ServerSocket sock = new ServerSocket(0)) { port = sock.getLocalPort(); @@ -94,11 +90,6 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest { stopAllGrids(); } - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - ignite.cache(null).clear(); - } - /** * @throws Exception If failed. */ @@ -235,6 +226,12 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest { { SocketStreamer sockStmr = null; + Ignite ignite = grid(0); + + IgniteCache cache = ignite.cache(null); + + cache.clear(); + try (IgniteDataStreamer stmr = ignite.dataStreamer(null)) { stmr.allowOverwrite(true); @@ -242,8 +239,6 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest { sockStmr = new SocketStreamer<>(); - IgniteCache cache = ignite.cache(null); - sockStmr.setIgnite(ignite); sockStmr.setStreamer(stmr); @@ -279,10 +274,10 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest { latch.await(); - assertEquals(CNT, cache.size(CachePeekMode.PRIMARY)); - for (int i = 0; i < CNT; i++) assertEquals(Integer.toString(i), cache.get(i)); + + assertEquals(CNT, cache.size(CachePeekMode.PRIMARY)); } finally { if (sockStmr != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index bc04f90..21f9424 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -29,7 +29,7 @@ import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.plugin.security.*; import org.apache.ignite.spi.*; -import org.apache.ignite.spi.swapspace.*; + import org.jetbrains.annotations.*; import java.io.*; @@ -447,28 +447,11 @@ public class GridSpiTestContext implements IgniteSpiContext { } /** {@inheritDoc} */ - @Override public void writeToSwap(String spaceName, Object key, @Nullable Object val, - @Nullable ClassLoader ldr) { - /* No-op. */ - } - - /** {@inheritDoc} */ - @Override public T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr) { - return null; - } - - /** {@inheritDoc} */ @Override public int partition(String cacheName, Object key) { return -1; } /** {@inheritDoc} */ - @Override public void removeFromSwap(String spaceName, Object key, - @Nullable ClassLoader ldr) { - // No-op. - } - - /** {@inheritDoc} */ @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node) { return null; } @@ -484,12 +467,6 @@ public class GridSpiTestContext implements IgniteSpiContext { } /** {@inheritDoc} */ - @Nullable @Override public T readValueFromOffheapAndSwap(@Nullable String spaceName, Object key, - @Nullable ClassLoader ldr) { - return null; - } - - /** {@inheritDoc} */ @Override public MessageFormatter messageFormatter() { if (formatter == null) { formatter = new MessageFormatter() { @@ -524,6 +501,16 @@ public class GridSpiTestContext implements IgniteSpiContext { return false; } + /** {@inheritDoc} */ + @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) { + // No-op. + } + /** * @param cacheName Cache name. * @return Map representing cache. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index e2dda54..d03d327 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -137,7 +137,7 @@ public final class GridTestUtils { } } - if (msg != null && (e.getMessage() == null || !e.getMessage().startsWith(msg))) { + if (msg != null && (e.getMessage() == null || !e.getMessage().contains(msg))) { U.error(log, "Unexpected exception message.", e); fail("Exception message is not as expected [expected=" + msg + ", actual=" + e.getMessage() + ']', e); @@ -1497,6 +1497,21 @@ public final class GridTestUtils { } /** + * {@link Class#getSimpleName()} does not return outer class name prefix for inner classes, for example, + * getSimpleName() returns "RegularDiscovery" instead of "GridDiscoveryManagerSelfTest$RegularDiscovery" + * This method return correct simple name for inner classes. + * + * @param cls Class + * @return Simple name with outer class prefix. + */ + public static String fullSimpleName(@NotNull Class cls) { + if (cls.getEnclosingClass() != null) + return cls.getEnclosingClass().getSimpleName() + "." + cls.getSimpleName(); + else + return cls.getSimpleName(); + } + + /** * Adds test to the suite only if it's not in {@code ignoredTests} set. * * @param suite TestSuite where to place the test. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index f3a9051..9c42920 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -455,7 +455,7 @@ public abstract class GridAbstractTest extends TestCase { } if (isFirstTest()) { - info(">>> Starting test class: " + getClass().getSimpleName() + " <<<"); + info(">>> Starting test class: " + GridTestUtils.fullSimpleName(getClass()) + " <<<"); if (startGrid) { IgniteConfiguration cfg = optimize(getConfiguration()); @@ -676,8 +676,12 @@ public abstract class GridAbstractTest extends TestCase { protected IgniteConfiguration optimize(IgniteConfiguration cfg) throws IgniteCheckedException { // TODO: IGNITE-605: propose another way to avoid network overhead in tests. if (cfg.getLocalHost() == null) { - if (cfg.getDiscoverySpi() instanceof TcpDiscoverySpi) + if (cfg.getDiscoverySpi() instanceof TcpDiscoverySpi) { cfg.setLocalHost("127.0.0.1"); + + if (((TcpDiscoverySpi)cfg.getDiscoverySpi()).getJoinTimeout() == 0) + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(8000); + } else cfg.setLocalHost(getTestResources().getLocalHost()); } @@ -732,9 +736,20 @@ public abstract class GridAbstractTest extends TestCase { * @param cancel Cancel flag. */ protected void stopAllGrids(boolean cancel) { - List ignites = G.allGrids(); + Collection clients = new ArrayList<>(); + Collection srvs = new ArrayList<>(); + + for (Ignite g : G.allGrids()) { + if (g.configuration().getDiscoverySpi().isClientMode()) + clients.add(g); + else + srvs.add(g); + } + + for (Ignite g : clients) + stopGrid(g.name(), cancel); - for (Ignite g : ignites) + for (Ignite g : srvs) stopGrid(g.name(), cancel); assert G.allGrids().isEmpty(); @@ -1004,17 +1019,6 @@ public abstract class GridAbstractTest extends TestCase { } /** - * This method should be overridden by subclasses to change configuration parameters. - * - * @return Grid configuration used for starting of grid. - * @param rsrcs Resources. - * @throws Exception If failed. - */ - protected IgniteConfiguration getConfiguration(IgniteTestResources rsrcs) throws Exception { - return getConfiguration(getTestGridName(), rsrcs); - } - - /** * @return Generated unique test grid name. */ public String getTestGridName() { @@ -1201,7 +1205,7 @@ public abstract class GridAbstractTest extends TestCase { serializedObj.clear(); if (isLastTest()) { - info(">>> Stopping test class: " + getClass().getSimpleName() + " <<<"); + info(">>> Stopping test class: " + GridTestUtils.fullSimpleName(getClass()) + " <<<"); TestCounters counters = getTestCounters(); @@ -1389,6 +1393,22 @@ public abstract class GridAbstractTest extends TestCase { /** * @param obj Object that should be wrap proxy + * @return Created proxy. + */ + protected T notSerializableProxy(final T obj) { + Class cls = (Class)obj.getClass(); + + Class[] interfaces = (Class[])cls.getInterfaces(); + + assert interfaces.length > 0; + + Class lastItf = interfaces[interfaces.length - 1]; + + return notSerializableProxy(obj, lastItf, Arrays.copyOf(interfaces, interfaces.length - 1)); + } + + /** + * @param obj Object that should be wrap proxy * @param itfCls Interface that should be implemented by proxy * @param itfClses Interfaces that should be implemented by proxy (vararg parameter) * @return Created proxy. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java index 0709880..31cbefa 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestCacheSession.java @@ -34,6 +34,9 @@ public class TestCacheSession implements CacheStoreSession { /** */ private Map props; + /** */ + private Object attachment; + /** * * @param tx Transaction. @@ -55,6 +58,21 @@ public class TestCacheSession implements CacheStoreSession { } /** {@inheritDoc} */ + @Override public Object attach(@Nullable Object attachment) { + Object prev = this.attachment; + + this.attachment = attachment; + + return prev; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public T attachment() { + return (T)attachment; + } + + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public Map properties() { if (props == null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java index 2bbcf1b..dc876d3 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/TestThreadLocalCacheSession.java @@ -54,6 +54,21 @@ public class TestThreadLocalCacheSession implements CacheStoreSession { } /** {@inheritDoc} */ + @Override public Object attach(@Nullable Object attachment) { + TestCacheSession ses = sesHolder.get(); + + return ses != null ? ses.attach(attachment) : null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public T attachment() { + TestCacheSession ses = sesHolder.get(); + + return ses!= null ? (T)ses.attachment() : null; + } + + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public Map properties() { TestCacheSession ses = sesHolder.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 5533897..d3535b4 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.local.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -383,14 +384,32 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { int actual = owners.size(); if (affNodes.size() != owners.size() || !affNodes.containsAll(owners)) { - LT.warn(log(), null, "Waiting for topology map update [grid=" + g.name() + - ", p=" + p + ", nodes=" + exp + ", owners=" + actual + - ", affNodes=" + affNodes + ", owners=" + owners + - ", locNode=" + g.cluster().localNode().id() + ']'); + LT.warn(log(), null, "Waiting for topology map update [" + + "grid=" + g.name() + + ", cache=" + cfg.getName() + + ", cacheId=" + dht.context().cacheId() + + ", p=" + p + + ", affNodesCnt=" + exp + + ", ownersCnt=" + actual + + ", affNodes=" + affNodes + + ", owners=" + owners + + ", locNode=" + g.cluster().localNode() + ']'); if (i == 0) start = System.currentTimeMillis(); + if (System.currentTimeMillis() - start > 30_000) + throw new IgniteException("Timeout of waiting for topology map update [" + + "grid=" + g.name() + + ", cache=" + cfg.getName() + + ", cacheId=" + dht.context().cacheId() + + ", p=" + p + + ", affNodesCnt=" + exp + + ", ownersCnt=" + actual + + ", affNodes=" + affNodes + + ", owners=" + owners + + ", locNode=" + g.cluster().localNode() + ']'); + Thread.sleep(200); // Busy wait. continue; @@ -409,6 +428,38 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { } /** + * @param ignite Node. + */ + public void dumpCacheDebugInfo(Ignite ignite) { + GridKernalContext ctx = ((IgniteKernal)ignite).context(); + + log.error("Cache information update [node=" + ignite.name() + + ", client=" + ignite.configuration().isClientMode() + ']'); + + GridCacheSharedContext cctx = ctx.cache().context(); + + log.error("Pending transactions:"); + + for (IgniteInternalTx tx : cctx.tm().activeTransactions()) + log.error(">>> " + tx); + + log.error("Pending explicit locks:"); + + for (GridCacheExplicitLockSpan lockSpan : cctx.mvcc().activeExplicitLocks()) + log.error(">>> " + lockSpan); + + log.error("Pending cache futures:"); + + for (GridCacheFuture fut : cctx.mvcc().activeFutures()) + log.error(">>> " + fut); + + log.error("Pending atomic cache futures:"); + + for (GridCacheFuture fut : cctx.mvcc().atomicFutures()) + log.error(">>> " + fut); + } + + /** * @param cache Cache. * @return Affinity. */ @@ -858,4 +909,28 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { ccfg.getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.CLOCK) U.sleep(50); } + + /** + * @param exp Expected. + * @param act Actual. + */ + protected void assertEqualsCollections(Collection exp, Collection act) { + if (exp.size() != act.size()) + fail("Collections are not equal:\nExpected:\t" + exp + "\nActual:\t" + act); + + Iterator it1 = exp.iterator(); + Iterator it2 = act.iterator(); + + int idx = 0; + + while (it1.hasNext()) { + Object item1 = it1.next(); + Object item2 = it2.next(); + + if (!F.eq(item1, item2)) + fail("Collections are not equal (position " + idx + "):\nExpected: " + exp + "\nActual: " + act); + + idx++; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 5a9d63a..8c061be 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -61,12 +61,14 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTest(IgniteStartUpTestSuite.suite()); suite.addTest(IgniteExternalizableSelfTestSuite.suite()); suite.addTest(IgniteP2PSelfTestSuite.suite()); - suite.addTest(IgniteCacheP2pUnmarshallingErrorTestSuit.suite(ignoredTests)); + suite.addTest(IgniteCacheP2pUnmarshallingErrorTestSuite.suite()); + suite.addTest(IgniteStreamSelfTestSuite.suite()); - suite.addTestSuite(GridSelfTest.class); + suite.addTest(new TestSuite(GridSelfTest.class)); GridTestUtils.addTestIfNeeded(suite, GridProjectionSelfTest.class, ignoredTests); - suite.addTestSuite(GridMessagingSelfTest.class); - suite.addTestSuite(GridMessagingNoPeerClassLoadingSelfTest.class); + suite.addTest(new TestSuite(GridMessagingSelfTest.class)); + suite.addTest(new TestSuite(IgniteMessagingWithClientTest.class)); + GridTestUtils.addTestIfNeeded(suite, GridMessagingNoPeerClassLoadingSelfTest.class, ignoredTests); if (U.isLinux() || U.isMacOs()) suite.addTest(IgniteIpcSharedMemorySelfTestSuite.suite());