Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DAA32200C6A for ; Wed, 29 Mar 2017 16:02:32 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D8E76160B8A; Wed, 29 Mar 2017 14:02:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 07C09160BC0 for ; Wed, 29 Mar 2017 16:02:28 +0200 (CEST) Received: (qmail 37286 invoked by uid 500); 29 Mar 2017 14:02:28 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 35842 invoked by uid 99); 29 Mar 2017 14:02:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Mar 2017 14:02:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7BF4CDFF36; Wed, 29 Mar 2017 14:02:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agura@apache.org To: commits@ignite.apache.org Date: Wed, 29 Mar 2017 14:03:12 -0000 Message-Id: <28496b7279c34548813a8837e0d398a6@git.apache.org> In-Reply-To: <68a0b2ff79d84fd8a41b08aef8918b0f@git.apache.org> References: <68a0b2ff79d84fd8a41b08aef8918b0f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [48/50] [abbrv] ignite git commit: ignite-4003 Async outgoing connections for communication SPI archived-at: Wed, 29 Mar 2017 14:02:33 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/ffa5f826/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java index b3b5d1a..3ba319b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java @@ -50,8 +50,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest { // Try provoke connection close on socket writeTimeout. commSpi.setSharedMemoryPort(-1); commSpi.setMessageQueueLimit(10); - commSpi.setSocketReceiveBuffer(40); - commSpi.setSocketSendBuffer(40); + commSpi.setSocketReceiveBuffer(64); + commSpi.setSocketSendBuffer(64); commSpi.setSocketWriteTimeout(100); commSpi.setUnacknowledgedMessagesBufferSize(1000); commSpi.setConnectTimeout(10_000); http://git-wip-us.apache.org/repos/asf/ignite/blob/ffa5f826/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java index 4dbb7ce..e623467 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java @@ -678,7 +678,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { try { SocketChannel ch = SocketChannel.open(new InetSocketAddress(U.getLocalHost(), srvr2.port())); - GridNioFuture fut = srvr1.createSession(ch, null); + GridNioFuture fut = srvr1.createSession(ch, null, false, null); ses = fut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ffa5f826/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java index 1e25003..bee63b3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.AddressResolver; import org.apache.ignite.configuration.BasicAddressResolver; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteCallable; @@ -111,7 +112,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { cfg.setConnectorConfiguration(null); TcpCommunicationSpi commSpi = new TcpCommunicationSpi() { - @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) + @Override protected IgniteInternalFuture createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException { Map attrs = new HashMap<>(node.attributes()); http://git-wip-us.apache.org/repos/asf/ignite/blob/ffa5f826/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java index 88276c2..07edc86 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; @@ -39,6 +40,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteMock; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; @@ -70,6 +72,9 @@ public abstract class GridAbstractCommunicationSelfTest ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(); + for (int i = 0; i < getSpiCount(); i++) { CommunicationSpi spi = getSpi(i); @@ -298,18 +309,20 @@ public abstract class GridAbstractCommunicationSelfTest>> Initialized context: nodeId=" + ctx.localNode().id()); spiRsrcs.add(rsrcs); rsrcs.inject(spi); + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); + if (useSsl) { IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite"); @@ -324,6 +337,8 @@ public abstract class GridAbstractCommunicationSelfTest spi : spis.values()) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ffa5f826/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index 78bf869..39ecd8e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -36,7 +36,9 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.nio.GridNioServer; @@ -51,6 +53,7 @@ import org.apache.ignite.spi.communication.GridTestMessage; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteMock; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; @@ -79,6 +82,9 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest nodes = new ArrayList<>(); /** */ + private static GridTimeoutProcessor timeoutProcessor; + + /** */ private static int port = 60_000; /** Use ssl. */ @@ -407,27 +413,37 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(); + for (int i = 0; i < SPI_CNT; i++) { CommunicationSpi spi = createSpi(); - GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); - IgniteTestResources rsrcs = new IgniteTestResources(); GridTestNode node = new GridTestNode(rsrcs.getNodeId()); + node.setAttribute(IgniteNodeAttributes.ATTR_CLIENT_MODE, false); + node.order(i + 1); GridSpiTestContext ctx = initSpiContext(); ctx.setLocalNode(node); + ctx.timeoutProcessor(timeoutProcessor); + info(">>> Initialized context: nodeId=" + ctx.localNode().id()); spiRsrcs.add(rsrcs); rsrcs.inject(spi); + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); + if (useSsl) { IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite"); @@ -494,6 +510,14 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest spi : spis) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ffa5f826/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index feaae11..f87ff09 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; import org.apache.ignite.internal.util.nio.GridNioServer; @@ -44,6 +45,7 @@ import org.apache.ignite.spi.communication.GridTestMessage; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; import org.apache.ignite.testframework.junits.spi.GridSpiTest; @@ -64,11 +66,11 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest nodes = new ArrayList<>(); /** */ + private static GridTimeoutProcessor timeoutProcessor; + + /** */ private static final int SPI_CNT = 2; - /** - * - */ static { GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO() { @Override public Message apply() { @@ -159,6 +161,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest sessions = GridTestUtils.getFieldValue(srv, "sessions"); + final Collection sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return !sessions.isEmpty(); + } + }, 5_000); - assertFalse(sessions.isEmpty()); boolean found = false; @@ -268,21 +277,21 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(); + for (int i = 0; i < SPI_CNT; i++) { TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit); - GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); - IgniteTestResources rsrcs = new IgniteTestResources(); GridTestNode node = new GridTestNode(rsrcs.getNodeId()); @@ -392,14 +405,20 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest spi : spis) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ffa5f826/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java index 2a043ee..46d2d1d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridNioServer; import org.apache.ignite.internal.util.nio.GridNioSession; @@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteMock; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; @@ -80,6 +82,9 @@ public class GridTcpCommunicationSpiRecoverySelfTest /** Use ssl. */ protected boolean useSsl; + /** */ + private static GridTimeoutProcessor timeoutProcessor; + /** * */ @@ -115,7 +120,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest /** {@inheritDoc} */ @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) { - // info("Test listener received message: " + msg); + //info("Test listener received message: " + msg); assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage); @@ -186,7 +191,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest * @return Timeout. */ protected long awaitForSocketWriteTimeout() { - return 8000; + return 20000; } /** @@ -298,6 +303,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest // Send message to establish connection. spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0)); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return lsnr1.rcvCnt.get() >= 1; + } + }, 1000); + final AtomicInteger sentCnt = new AtomicInteger(1); int errCnt = 0; @@ -413,6 +424,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest // Send message to establish connection. spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0)); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return lsnr1.rcvCnt.get() >= 1; + } + }, 1000); + expCnt1.incrementAndGet(); int errCnt = 0; @@ -451,7 +468,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest ses1.resumeReads().get(); } catch (IgniteCheckedException ignore) { - // Can fail is ses1 was closed. + // Can fail if ses1 was closed. } // Wait when session is closed, then try to open new connection from node1. @@ -534,6 +551,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest // Send message to establish connection. spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0)); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return lsnr1.rcvCnt.get() >= 1; + } + }, 1000); + final AtomicInteger sentCnt = new AtomicInteger(1); int errCnt = 0; @@ -686,11 +709,15 @@ public class GridTcpCommunicationSpiRecoverySelfTest Map ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(); + for (int i = 0; i < SPI_CNT; i++) { TcpCommunicationSpi spi = getSpi(i); - GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); - IgniteTestResources rsrcs = new IgniteTestResources(); GridTestNode node = new GridTestNode(rsrcs.getNodeId()); @@ -701,10 +728,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest ctx.setLocalNode(node); + ctx.timeoutProcessor(timeoutProcessor); + spiRsrcs.add(rsrcs); rsrcs.inject(spi); + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); + if (useSsl) { IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite"); @@ -770,6 +801,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest * @throws Exception If failed. */ private void stopSpis() throws Exception { + if (timeoutProcessor != null) { + timeoutProcessor.onKernalStop(true); + + timeoutProcessor.stop(true); + + timeoutProcessor = null; + } + for (CommunicationSpi spi : spis) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ffa5f826/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java index 3f58055..7b59da3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java @@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; import org.apache.ignite.internal.util.nio.GridNioServer; @@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; import org.apache.ignite.testframework.junits.spi.GridSpiTest; @@ -70,6 +72,9 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest= 1 && lsnr1.rcvCnt.get() >= 1; + } + }, 1000); + } } expMsgs += msgPerIter; @@ -415,6 +429,12 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(); + for (int i = 0; i < SPI_CNT; i++) { TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit); @@ -428,6 +448,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest spi : spis) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ffa5f826/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java index 2b49d53..9c59cb2 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java @@ -188,8 +188,7 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest { final CountDownLatch latch = new CountDownLatch(1); grid(0).events().localListen(new IgnitePredicate() { - @Override - public boolean apply(Event event) { + @Override public boolean apply(Event evt) { latch.countDown(); return true; @@ -239,14 +238,14 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest { }, 5000); try { - fut1.get(); + fut1.get(1000); } catch (IgniteCheckedException e) { // No-op. } try { - fut2.get(); + fut2.get(1000); } catch (IgniteCheckedException e) { // No-op. @@ -297,8 +296,9 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest { */ private static class TestCommunicationSpi extends TcpCommunicationSpi { /** {@inheritDoc} */ - @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) - throws IgniteCheckedException { + @Override protected IgniteInternalFuture createTcpClient(ClusterNode node, int connIdx) + throws IgniteCheckedException + { if (pred.apply(getLocalNode(), node)) { Map attrs = new HashMap<>(node.attributes()); http://git-wip-us.apache.org/repos/asf/ignite/blob/ffa5f826/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java index 4fe67c1..baa1270 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java @@ -240,8 +240,9 @@ public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest */ private static class TestCommunicationSpi extends TcpCommunicationSpi { /** {@inheritDoc} */ - @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) - throws IgniteCheckedException { + @Override protected IgniteInternalFuture createTcpClient(ClusterNode node, int connIdx) + throws IgniteCheckedException + { if (PRED.apply(node)) { Map attrs = new HashMap<>(node.attributes()); http://git-wip-us.apache.org/repos/asf/ignite/blob/ffa5f826/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java index 8a20eec..a241a04 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java @@ -1004,7 +1004,10 @@ public class HadoopExternalCommunication { HandshakeFinish fin = new HandshakeFinish(); - GridNioSession ses = nioSrvr.createSession(ch, F.asMap(HANDSHAKE_FINISH_META, fin)).get(); + GridNioFuture sesFut = + nioSrvr.createSession(ch, F.asMap(HANDSHAKE_FINISH_META, fin), false, null); + + GridNioSession ses = sesFut.get(); client = new HadoopTcpNioCommunicationClient(ses);