ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [16/49] ignite git commit: IGNITE-3220 I/O bottleneck on server/client cluster configuration Communications optimizations: - possibility to open separate in/out connections - possibility to have multiple connections between nodes - implemented NIO sessio
Date Fri, 16 Dec 2016 11:41:45 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index 482e2ef..c7a1a53 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.communication.tcp;
 
+import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.mxbean.MXBeanDescription;
 import org.apache.ignite.spi.IgniteSpiManagementMBean;
 
@@ -44,6 +45,35 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
     public int getLocalPort();
 
     /**
+     * Returns {@code true} if {@code TcpCommunicationSpi} should
+     * maintain connection for outgoing and incoming messages separately.
+     * In this case total number of connections between local and some remote node
+     * is {@link #getConnectionsPerNode()} * 2.
+     * <p>
+     * Returns {@code false} if each connection of {@link #getConnectionsPerNode()}
+     * should be used for outgoing and incoming messages. In this case load NIO selectors load
+     * balancing of {@link GridNioServer} will be disabled.
+     * <p>
+     * Default is {@code true}.
+     *
+     * @return {@code true} to use paired connections and {@code false} otherwise.
+     * @see #getConnectionsPerNode()
+     */
+    @MXBeanDescription("Paired connections used.")
+    public boolean isUsePairedConnections();
+
+    /**
+     * Gets number of connections to each remote node. if {@link #isUsePairedConnections()}
+     * is {@code true} then number of connections is doubled and half is used for incoming and
+     * half for outgoing messages.
+     *
+     * @return Number of connections per node.
+     * @see #isUsePairedConnections()
+     */
+    @MXBeanDescription("Connections per node.")
+    public int getConnectionsPerNode();
+
+    /**
      * Gets local port for shared memory communication.
      *
      * @return Port number.
@@ -153,6 +183,16 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
     public int getReconnectCount();
 
     /**
+     * Defines how many non-blocking {@code selector.selectNow()} should be made before
+     * falling into {@code selector.select(long)} in NIO server. Long value. Default is {@code 0}.
+     * Can be set to {@code Long.MAX_VALUE} so selector threads will never block.
+     *
+     * @return Selector thread busy-loop iterations.
+     */
+    @MXBeanDescription("Selector thread busy-loop iterations.")
+    public long getSelectorSpins();
+
+    /**
      * Gets value for {@code TCP_NODELAY} socket option.
      *
      * @return {@code True} if TCP delay is disabled.

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 204b685..50fa3bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -3424,7 +3424,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     if (log.isDebugEnabled())
                         log.debug("Node validation failed [res=" + err + ", node=" + node + ']');
 
-                    utilityPool.submit(
+                    utilityPool.execute(
                         new Runnable() {
                             @Override public void run() {
                                 boolean ping = node.id().equals(err.nodeId()) ? pingNode(node) : pingNode(err.nodeId());
@@ -3469,7 +3469,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 final String rmtMarsh = node.attribute(ATTR_MARSHALLER);
 
                 if (!F.eq(locMarsh, rmtMarsh)) {
-                    utilityPool.submit(
+                    utilityPool.execute(
                         new Runnable() {
                             @Override public void run() {
                                 String errMsg = "Local node's marshaller differs from remote node's marshaller " +
@@ -3526,7 +3526,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 boolean locLateAssignBool = locLateAssign != null ? locLateAssign : false;
 
                 if (locMarshUseDfltSuidBool != rmtMarshUseDfltSuidBool) {
-                    utilityPool.submit(
+                    utilityPool.execute(
                         new Runnable() {
                             @Override public void run() {
                                 String errMsg = "Local node's " + IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID +
@@ -3568,7 +3568,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 final boolean rmtMarshCompactFooterBool = rmtMarshCompactFooter != null ? rmtMarshCompactFooter : false;
 
                 if (locMarshCompactFooterBool != rmtMarshCompactFooterBool) {
-                    utilityPool.submit(
+                    utilityPool.execute(
                         new Runnable() {
                             @Override public void run() {
                                 String errMsg = "Local node's binary marshaller \"compactFooter\" property differs from " +
@@ -3606,7 +3606,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 final boolean rmtMarshStrSerialVer2Bool = rmtMarshStrSerialVer2 != null ? rmtMarshStrSerialVer2 : false;
 
                 if (locMarshStrSerialVer2Bool != rmtMarshStrSerialVer2Bool) {
-                    utilityPool.submit(
+                    utilityPool.execute(
                         new Runnable() {
                             @Override public void run() {
                                 String errMsg = "Local node's " + IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2 +
@@ -3679,7 +3679,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     final Boolean rmtSrvcCompatibilityEnabled = node.attribute(ATTR_SERVICES_COMPATIBILITY_MODE);
 
                     if (!F.eq(locSrvcCompatibilityEnabled, rmtSrvcCompatibilityEnabled)) {
-                        utilityPool.submit(
+                        utilityPool.execute(
                             new Runnable() {
                                 @Override public void run() {
                                     String errMsg = "Local node's " + IGNITE_SERVICES_COMPATIBILITY_MODE +
@@ -3714,7 +3714,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     }
                 }
                 else if (Boolean.FALSE.equals(locSrvcCompatibilityEnabled)) {
-                    utilityPool.submit(
+                    utilityPool.execute(
                         new Runnable() {
                             @Override public void run() {
                                 String errMsg = "Remote node doesn't support lazy services configuration and " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index d1c8d19..127778b 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -184,6 +184,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
         try {
             srv = new GridNioServer.Builder<byte[]>()
                 .address(addr == null ? InetAddress.getLocalHost() : addr)
+                .serverName("sock-streamer")
                 .port(port)
                 .listener(lsnr)
                 .logger(log)

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
index 55557dd..d173594 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
@@ -20,6 +20,7 @@ package org.apache.ignite.thread;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -62,4 +63,9 @@ public class IgniteThreadFactory implements ThreadFactory {
     @Override public Thread newThread(@NotNull Runnable r) {
         return new IgniteThread(gridName, threadName, r, idxGen.incrementAndGet());
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteThreadFactory.class, this, super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
index 760313b..5721887 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteSlowClientDetectionSelfTest.java
@@ -75,6 +75,7 @@ public class IgniteSlowClientDetectionSelfTest extends GridCommonAbstractTest {
         commSpi.setSlowClientQueueLimit(50);
         commSpi.setSharedMemoryPort(-1);
         commSpi.setIdleConnectionTimeout(300_000);
+        commSpi.setConnectionsPerNode(1);
 
         cfg.setCommunicationSpi(commSpi);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceMultipleConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceMultipleConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceMultipleConnectionsTest.java
new file mode 100644
index 0000000..e95b1ec
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceMultipleConnectionsTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+/**
+ *
+ */
+public class IgniteCommunicationBalanceMultipleConnectionsTest extends IgniteCommunicationBalanceTest {
+    /** {@inheritDoc} */
+    @Override protected int connectionsPerNode() {
+        return 5;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
new file mode 100644
index 0000000..e142aef
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private int selectors;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpCommunicationSpi commSpi = ((TcpCommunicationSpi)cfg.getCommunicationSpi());
+
+        commSpi.setSharedMemoryPort(-1);
+        commSpi.setConnectionsPerNode(connectionsPerNode());
+
+        if (selectors > 0)
+            commSpi.setSelectorsCount(selectors);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @return Connections per node.
+     */
+    protected int connectionsPerNode() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBalance1() throws Exception {
+        System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "5000");
+
+        try {
+            selectors = 4;
+
+            final int SRVS = 4;
+
+            startGridsMultiThreaded(SRVS);
+
+            client = true;
+
+            final Ignite client = startGrid(SRVS);
+
+            for (int i = 0; i < 4; i++) {
+                ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
+
+                client.compute(client.cluster().forNode(node)).call(new DummyCallable(null));
+            }
+
+            waitNioBalanceStop(Collections.singletonList(client), 10_000);
+
+            final GridNioServer srv = GridTestUtils.getFieldValue(client.configuration().getCommunicationSpi(), "nioSrvr");
+
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            long readMoveCnt1 = srv.readerMoveCount();
+            long writeMoveCnt1 = srv.writerMoveCount();
+
+            int prevNodeIdx = -1;
+
+            for (int iter = 0; iter < 10; iter++) {
+                int nodeIdx = rnd.nextInt(SRVS);
+
+                while (prevNodeIdx == nodeIdx)
+                    nodeIdx = rnd.nextInt(SRVS);
+
+                prevNodeIdx = nodeIdx;
+
+                log.info("Iteration [iter=" + iter + ", node=" + nodeIdx + ']');
+
+                final long readMoveCnt = readMoveCnt1;
+                final long writeMoveCnt = writeMoveCnt1;
+
+                final int nodeIdx0 = nodeIdx;
+
+                GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        byte[] data = new byte[100_000];
+
+                        for (int j = 0; j < 10; j++) {
+                            for (int i = 0; i < SRVS; i++) {
+                                ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
+
+                                IgniteCompute compute = client.compute(client.cluster().forNode(node));
+
+                                compute.call(new DummyCallable(i == nodeIdx0 ? data : null));
+                            }
+                        }
+
+                        return srv.readerMoveCount() > readMoveCnt && srv.writerMoveCount() > writeMoveCnt;
+                    }
+                }, 30_000);
+
+                waitNioBalanceStop(Collections.singletonList(client), 30_000);
+
+                long readMoveCnt2 = srv.readerMoveCount();
+                long writeMoveCnt2 = srv.writerMoveCount();
+
+                log.info("Move counts [rc1=" + readMoveCnt1 +
+                    ", wc1=" + writeMoveCnt1 +
+                    ", rc2=" + readMoveCnt2 +
+                    ", wc2=" + writeMoveCnt2 + ']');
+
+                assertTrue(readMoveCnt2 > readMoveCnt1);
+                assertTrue(writeMoveCnt2 > writeMoveCnt1);
+
+                readMoveCnt1 = readMoveCnt2;
+                writeMoveCnt1 = writeMoveCnt2;
+            }
+
+            waitNioBalanceStop(G.allGrids(), 10_000);
+        }
+        finally {
+            System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "");
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBalance2() throws Exception {
+        System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "1000");
+
+        try {
+            startGridsMultiThreaded(5);
+
+            client = true;
+
+            startGridsMultiThreaded(5, 5);
+
+            for (int i = 0; i < 5; i++) {
+                log.info("Iteration: " + i);
+
+                final AtomicInteger idx = new AtomicInteger();
+
+                GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        Ignite node = ignite(idx.incrementAndGet() % 10);
+
+                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                        int msgs = rnd.nextInt(500, 600);
+
+                        for (int i = 0; i < msgs; i++) {
+                            int sndTo = rnd.nextInt(10);
+
+                            ClusterNode sntToNode = node.cluster().node(ignite(sndTo).cluster().localNode().id());
+
+                            IgniteCompute compute = node.compute(node.cluster().forNode(sntToNode));
+
+                            compute.call(new DummyCallable(new byte[rnd.nextInt(rnd.nextInt(256, 1024))]));
+                        }
+
+                        return null;
+                    }
+                }, 30, "test-thread");
+
+                waitNioBalanceStop(G.allGrids(), 10_000);
+            }
+        }
+        finally {
+            System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "");
+        }
+    }
+
+    /**
+     * @param nodes Node.
+     * @param timeout Timeout.
+     * @throws Exception If failed.
+     */
+    private void waitNioBalanceStop(List<Ignite> nodes, long timeout) throws Exception {
+        final List<GridNioServer> srvs = new ArrayList<>();
+
+        for (Ignite node : nodes) {
+            TcpCommunicationSpi spi = (TcpCommunicationSpi) node.configuration().getCommunicationSpi();
+
+            GridNioServer srv = GridTestUtils.getFieldValue(spi, "nioSrvr");
+
+            srvs.add(srv);
+        }
+
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() {
+            @Override public boolean applyx() throws IgniteCheckedException {
+                List<Long> rCnts = new ArrayList<>();
+                List<Long> wCnts = new ArrayList<>();
+
+                for (GridNioServer srv : srvs) {
+                    long readerMovCnt1 = srv.readerMoveCount();
+                    long writerMovCnt1 = srv.writerMoveCount();
+
+                    rCnts.add(readerMovCnt1);
+                    wCnts.add(writerMovCnt1);
+                }
+
+                U.sleep(2000);
+
+                for (int i = 0; i < srvs.size(); i++) {
+                    GridNioServer srv = srvs.get(i);
+
+                    long readerMovCnt1 = rCnts.get(i);
+                    long writerMovCnt1 = wCnts.get(i);
+
+                    long readerMovCnt2 = srv.readerMoveCount();
+                    long writerMovCnt2 = srv.writerMoveCount();
+
+                    if (readerMovCnt1 != readerMovCnt2) {
+                        log.info("Readers balance is in progress [node=" + i + ", cnt1=" + readerMovCnt1 +
+                            ", cnt2=" + readerMovCnt2 + ']');
+
+                        return false;
+                    }
+                    if (writerMovCnt1 != writerMovCnt2) {
+                        log.info("Writers balance is in progress [node=" + i + ", cnt1=" + writerMovCnt1 +
+                            ", cnt2=" + writerMovCnt2 + ']');
+
+                        return false;
+                    }
+                }
+
+                return true;
+            }
+        }, timeout));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRandomBalance() throws Exception {
+        System.setProperty(GridNioServer.IGNITE_IO_BALANCE_RANDOM_BALANCE, "true");
+        System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "500");
+
+        try {
+            final int NODES = 10;
+
+            startGridsMultiThreaded(NODES);
+
+            final long stopTime = System.currentTimeMillis() + 60_000;
+
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    while (System.currentTimeMillis() < stopTime)
+                        ignite(rnd.nextInt(NODES)).compute().broadcast(new DummyCallable(null));
+
+                    return null;
+                }
+            }, 20, "test-thread");
+        }
+        finally {
+            System.setProperty(GridNioServer.IGNITE_IO_BALANCE_RANDOM_BALANCE, "");
+            System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "");
+        }
+    }
+
+    /**
+     *
+     */
+    private static class DummyCallable implements IgniteCallable<Object> {
+        /** */
+        private byte[] data;
+
+        /**
+         * @param data Data.
+         */
+        DummyCallable(byte[] data) {
+            this.data = data;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            return data;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessagesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessagesTest.java
new file mode 100644
index 0000000..b644878
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessagesTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteIoTestMessagesTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(3);
+
+        client = true;
+
+        startGrid(3);
+
+        startGrid(4);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIoTestMessages() throws Exception {
+        for (Ignite node : G.allGrids()) {
+            IgniteKernal ignite = (IgniteKernal)node;
+
+            List<ClusterNode> rmts = new ArrayList<>(ignite.cluster().forRemotes().nodes());
+
+            assertEquals(4, rmts.size());
+
+            for (ClusterNode rmt : rmts) {
+                ignite.sendIoTest(rmt, new byte[1024], false);
+
+                ignite.sendIoTest(rmt, new byte[1024], true);
+
+                ignite.sendIoTest(rmts, new byte[1024], false);
+
+                ignite.sendIoTest(rmts, new byte[1024], true);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
new file mode 100644
index 0000000..510751e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteVariousConnectionNumberTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteVariousConnectionNumberTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 6;
+
+    /** */
+    private static Random rnd = new Random();
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        int connections = rnd.nextInt(10) + 1;
+
+        log.info("Node connections [name=" + gridName + ", connections=" + connections + ']');
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setConnectionsPerNode(connections);
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setUsePairedConnections(rnd.nextBoolean());
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        long seed = U.currentTimeMillis();
+
+        rnd.setSeed(seed);
+
+        log.info("Random seed: " + seed);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testVariousConnectionNumber() throws Exception {
+        startGridsMultiThreaded(3);
+
+        client = true;
+
+        startGridsMultiThreaded(3, 3);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(REPLICATED);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        ignite(0).createCache(ccfg);
+
+        for (int i = 0; i < 10; i++) {
+            log.info("Iteration: " + i);
+
+            runOperations(5000);
+
+            awaitPartitionMapExchange();
+
+            int idx = ThreadLocalRandom.current().nextInt(NODES);
+
+            Ignite node = ignite(idx);
+
+            client = node.configuration().isClientMode();
+
+            stopGrid(idx);
+
+            startGrid(idx);
+        }
+    }
+
+    /**
+     * @param time Execution time.
+     * @throws Exception If failed.
+     */
+    private void runOperations(final long time) throws Exception {
+        final AtomicInteger idx = new AtomicInteger();
+
+        GridTestUtils.runMultiThreaded(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                Ignite node = ignite(idx.getAndIncrement() % NODES);
+
+                IgniteCache cache = node.cache(null);
+
+                long stopTime = U.currentTimeMillis() + time;
+
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                while (U.currentTimeMillis() < stopTime) {
+                    cache.put(rnd.nextInt(10_000), 0);
+
+                    node.compute().broadcast(new DummyJob());
+                }
+
+                return null;
+            }
+        }, NODES * 10, "test-thread");
+    }
+
+    /**
+     *
+     */
+    private static class DummyJob implements IgniteRunnable {
+        /** {@inheritDoc} */
+        @Override public void run() {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
index 67ec371..eaa9923 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
@@ -86,6 +86,11 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
     }
 
     /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 6 * 60 * 1000;
+    }
+
+    /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
@@ -170,9 +175,17 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param cacheMode Cache mode.
+     * @param writeSync Write synchronization mode.
+     * @param fairAff Fair affinity flag.
+     * @param ignite Node to use.
+     * @param name Cache name.
      */
-    protected void createCache(CacheMode cacheMode, CacheWriteSynchronizationMode writeSync, boolean fairAff,
-        Ignite ignite, String name) {
+    protected void createCache(CacheMode cacheMode,
+        CacheWriteSynchronizationMode writeSync,
+        boolean fairAff,
+        Ignite ignite,
+        String name) {
         ignite.createCache(cacheConfiguration(name, cacheMode, writeSync, fairAff));
     }
 
@@ -269,9 +282,18 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
 
             boolean checkData = fullSync && !optimistic;
 
+            long stopTime = System.currentTimeMillis() + 10_000;
+
             for (int i = 0; i < 10_000; i++) {
-                if (i % 100 == 0)
+                if (i % 100 == 0) {
+                    if (System.currentTimeMillis() > stopTime) {
+                        log.info("Stop on timeout, iteration: " + i);
+
+                        break;
+                    }
+
                     log.info("Iteration: " + i);
+                }
 
                 boolean rollback = i % 10 == 0;
 
@@ -557,4 +579,4 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
             return old;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java
index 9405a19..3a2bc81 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridAbstractCacheInterceptorRebalanceTest.java
@@ -200,7 +200,9 @@ public abstract class GridAbstractCacheInterceptorRebalanceTest extends GridComm
     private void testRebalance(final Operation operation) throws Exception {
         interceptor = new RebalanceUpdateInterceptor();
 
-        for (int iter = 0; iter < TEST_ITERATIONS; iter++) {
+        long stopTime = System.currentTimeMillis() + 2 * 60_000;
+
+        for (int iter = 0; iter < TEST_ITERATIONS && System.currentTimeMillis() < stopTime; iter++) {
             log.info("Iteration: " + iter);
 
             failed = false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
index 9458a63..6e2e91f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java
@@ -115,10 +115,10 @@ public class GridCacheOffHeapMultiThreadedUpdateSelfTest extends GridCacheOffHea
         if (gridCount() > 1)
             testPutTx(keyForNode(1), PESSIMISTIC);
     }
-    
+
     /**
      * TODO: IGNITE-592.
-     *  
+     *
      * @throws Exception If failed.
      */
     public void testPutTxOptimistic() throws Exception {
@@ -227,4 +227,4 @@ public class GridCacheOffHeapMultiThreadedUpdateSelfTest extends GridCacheOffHea
 
         assertFalse(failed);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java
new file mode 100644
index 0000000..30fc9ef
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicMessageRecovery10ConnectionsTest extends IgniteCacheAtomicMessageRecoveryTest {
+    /** {@inheritDoc} */
+    @Override protected int connectionsPerNode() {
+        return 10;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
new file mode 100644
index 0000000..71772ef
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest extends IgniteCacheAtomicMessageRecoveryTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpCommunicationSpi commSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
+
+        assertTrue(commSpi.isUsePairedConnections());
+
+        commSpi.setUsePairedConnections(false);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecovery10ConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecovery10ConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecovery10ConnectionsTest.java
new file mode 100644
index 0000000..919aea6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheConnectionRecovery10ConnectionsTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+/**
+ *
+ */
+public class IgniteCacheConnectionRecovery10ConnectionsTest extends IgniteCacheConnectionRecoveryTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setConnectionsPerNode(10);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
index 2f700f3..a91de67 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
@@ -107,7 +107,7 @@ public class IgniteCacheCreatePutTest extends GridCommonAbstractTest {
         try {
             int iter = 0;
 
-            while (System.currentTimeMillis() < stopTime) {
+            while (System.currentTimeMillis() < stopTime && iter < 5) {
                 log.info("Iteration: " + iter++);
 
                 try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
index 0460a8f..1bfd727 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
@@ -58,6 +58,7 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
 
         commSpi.setSocketWriteTimeout(1000);
         commSpi.setSharedMemoryPort(-1);
+        commSpi.setConnectionsPerNode(connectionsPerNode());
 
         cfg.setCommunicationSpi(commSpi);
 
@@ -76,6 +77,13 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
     }
 
     /**
+     * @return Value for {@link TcpCommunicationSpi#setConnectionsPerNode(int)}.
+     */
+    protected int connectionsPerNode() {
+        return TcpCommunicationSpi.DFLT_CONN_PER_NODE;
+    }
+
+    /**
      * @return Cache atomicity mode.
      */
     protected abstract CacheAtomicityMode atomicityMode();
@@ -174,18 +182,22 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
     static boolean closeSessions(Ignite ignite) throws Exception {
         TcpCommunicationSpi commSpi = (TcpCommunicationSpi)ignite.configuration().getCommunicationSpi();
 
-        Map<UUID, GridCommunicationClient> clients = U.field(commSpi, "clients");
+        Map<UUID, GridCommunicationClient[]> clients = U.field(commSpi, "clients");
 
         boolean closed = false;
 
-        for (GridCommunicationClient client : clients.values()) {
-            GridTcpNioCommunicationClient client0 = (GridTcpNioCommunicationClient)client;
+        for (GridCommunicationClient[] clients0 : clients.values()) {
+            for (GridCommunicationClient client : clients0) {
+                if (client != null) {
+                    GridTcpNioCommunicationClient client0 = (GridTcpNioCommunicationClient)client;
 
-            GridNioSession ses = client0.session();
+                    GridNioSession ses = client0.session();
 
-            ses.close();
+                    ses.close();
 
-            closed = true;
+                    closed = true;
+                }
+            }
         }
 
         return closed;

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
index 6256225..0dd4079 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
@@ -50,8 +50,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
         // Try provoke connection close on socket writeTimeout.
         commSpi.setSharedMemoryPort(-1);
         commSpi.setMessageQueueLimit(10);
-        commSpi.setSocketReceiveBuffer(32);
-        commSpi.setSocketSendBuffer(32);
+        commSpi.setSocketReceiveBuffer(40);
+        commSpi.setSocketSendBuffer(40);
         commSpi.setSocketWriteTimeout(100);
         commSpi.setUnacknowledgedMessagesBufferSize(1000);
         commSpi.setConnectTimeout(10_000);
@@ -66,15 +66,20 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
         super.afterTest();
     }
 
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60_000;
+    }
+
     /**
      * @throws Exception If failed.
      */
     public void testMessageQueueLimit() throws Exception {
-        startGridsMultiThreaded(3);
-
-        for (int i = 0; i < 15; i++) {
+        for (int i = 0; i < 3; i++) {
             log.info("Iteration: " + i);
 
+            startGridsMultiThreaded(3);
+
             IgniteInternalFuture<?> fut1 = startJobThreads(50);
 
             U.sleep(100);
@@ -83,6 +88,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
 
             fut1.get();
             fut2.get();
+
+            stopAllGrids();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
index 3fca826..322690c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheMultiTxLockSelfTest.java
@@ -86,7 +86,6 @@ public class IgniteCacheMultiTxLockSelfTest extends GridCommonAbstractTest {
         plc.setMaxSize(100000);
 
         ccfg.setEvictionPolicy(plc);
-        ccfg.setEvictSynchronized(true);
 
         c.setCacheConfiguration(ccfg);
 
@@ -95,6 +94,11 @@ public class IgniteCacheMultiTxLockSelfTest extends GridCommonAbstractTest {
         return c;
     }
 
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 60_000;
+    }
+
     /**
      * @throws Exception If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java
new file mode 100644
index 0000000..e8175e5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ *
+ */
+public class GridCacheAtomicPrimaryWriteOrderNoStripedPoolMultiNodeFullApiSelfTest extends
+    GridCacheAtomicPrimaryWriteOrderMultiNodeFullApiSelfTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setStripedPoolSize(-1);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java
new file mode 100644
index 0000000..05fe85f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ *
+ */
+public class GridCachePartitionedNoStripedPoolMultiNodeFullApiSelfTest extends
+    GridCachePartitionedMultiNodeFullApiSelfTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setStripedPoolSize(-1);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
index c9d18eb..e9d74ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
@@ -211,7 +211,7 @@ public class TxDeadlockDetectionNoHangsTest extends GridCommonAbstractTest {
                             tx.commit();
                         }
                         catch (Exception e) {
-                            e.printStackTrace();
+                            log.info("Ignore error: " + e);
                         }
                     }
                 }, NODES_CNT * 3, "tx-thread");

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
index aa240aa..f6a06c2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOptimisticDeadlockDetectionTest.java
@@ -111,6 +111,9 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest {
 
         cfg.setClientMode(client);
 
+        // Test spi blocks message send, this can cause hang with striped pool.
+        cfg.setStripedPoolSize(-1);
+
         return cfg;
     }
 
@@ -274,8 +277,8 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest {
 
                     Object k;
 
-                    log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
-                        ", tx=" + tx + ", key=" + transformer.apply(key) + ']');
+                    log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode().id() +
+                        ", tx=" + tx.xid() + ", key=" + transformer.apply(key) + ']');
 
                     cache.put(transformer.apply(key), 0);
 
@@ -309,23 +312,27 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest {
                         entries.put(k, 2);
                     }
 
-                    log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode() +
-                        ", tx=" + tx + ", entries=" + entries + ']');
+                    log.info(">>> Performs put [node=" + ((IgniteKernal)ignite).localNode().id() +
+                        ", tx=" + tx.xid() + ", entries=" + entries + ']');
 
                     cache.putAll(entries);
 
                     tx.commit();
                 }
                 catch (Throwable e) {
-                    U.error(log, "Expected exception: ", e);
+                    log.info("Expected exception: " + e);
+
+                    e.printStackTrace(System.out);
 
                     // At least one stack trace should contain TransactionDeadlockException.
                     if (hasCause(e, TransactionTimeoutException.class) &&
-                        hasCause(e, TransactionDeadlockException.class)
-                        ) {
-                        if (deadlockErr.compareAndSet(null, cause(e, TransactionDeadlockException.class)))
-                            U.error(log, "At least one stack trace should contain " +
-                                TransactionDeadlockException.class.getSimpleName(), e);
+                        hasCause(e, TransactionDeadlockException.class)) {
+                        if (deadlockErr.compareAndSet(null, cause(e, TransactionDeadlockException.class))) {
+                            log.info("At least one stack trace should contain " +
+                                TransactionDeadlockException.class.getSimpleName());
+
+                            e.printStackTrace(System.out);
+                        }
                     }
                 }
             }
@@ -344,7 +351,7 @@ public class TxOptimisticDeadlockDetectionTest extends GridCommonAbstractTest {
 
         TransactionDeadlockException deadlockE = deadlockErr.get();
 
-        assertNotNull(deadlockE);
+        assertNotNull("Failed to detect deadlock", deadlockE);
 
         boolean fail = false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java
index 6fc7e02..7b5abf5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java
@@ -372,4 +372,4 @@ public class GridServiceProcessorProxySelfTest extends GridServiceProcessorAbstr
             X.println("Executing cache service: " + ctx.name());
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
index adcd144..4bc9f01 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.future;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -227,87 +228,98 @@ public class GridFutureAdapterSelfTest extends GridCommonAbstractTest {
      *
      * @throws Exception In case of any exception.
      */
-    @SuppressWarnings("ErrorNotRethrown")
     public void testChaining() throws Exception {
+        checkChaining(null);
+
+        ExecutorService exec = Executors.newFixedThreadPool(1);
+
+        try {
+            checkChaining(exec);
+
+            GridFinishedFuture<Integer> fut = new GridFinishedFuture<>(1);
+
+            IgniteInternalFuture<Object> chain = fut.chain(new CX1<IgniteInternalFuture<Integer>, Object>() {
+                @Override public Object applyx(IgniteInternalFuture<Integer> fut) throws IgniteCheckedException {
+                    return fut.get() + 1;
+                }
+            }, exec);
+
+            assertEquals(2, chain.get());
+        }
+        finally {
+            exec.shutdown();
+        }
+    }
+
+    /**
+     * @param exec Executor for chain callback.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ErrorNotRethrown")
+    private void checkChaining(ExecutorService exec) throws Exception {
         final CX1<IgniteInternalFuture<Object>, Object> passThrough = new CX1<IgniteInternalFuture<Object>, Object>() {
             @Override public Object applyx(IgniteInternalFuture<Object> f) throws IgniteCheckedException {
                 return f.get();
             }
         };
 
-        final GridTestKernalContext ctx = new GridTestKernalContext(log);
-
-        ctx.setExecutorService(Executors.newFixedThreadPool(1));
-        ctx.setSystemExecutorService(Executors.newFixedThreadPool(1));
-
-        ctx.add(new PoolProcessor(ctx));
-        ctx.add(new GridClosureProcessor(ctx));
+        GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
+        IgniteInternalFuture<Object> chain = exec != null ? fut.chain(passThrough, exec) : fut.chain(passThrough);
 
-        ctx.start();
+        assertFalse(fut.isDone());
+        assertFalse(chain.isDone());
 
         try {
-            // Test result returned.
-
-            GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
-            IgniteInternalFuture<Object> chain = fut.chain(passThrough);
+            chain.get(20);
 
-            assertFalse(fut.isDone());
-            assertFalse(chain.isDone());
-
-            try {
-                chain.get(20);
-
-                fail("Expects timeout exception.");
-            }
-            catch (IgniteFutureTimeoutCheckedException e) {
-                info("Expected timeout exception: " + e.getMessage());
-            }
+            fail("Expects timeout exception.");
+        }
+        catch (IgniteFutureTimeoutCheckedException e) {
+            info("Expected timeout exception: " + e.getMessage());
+        }
 
-            fut.onDone("result");
+        fut.onDone("result");
 
-            assertEquals("result", chain.get(1));
+        assertEquals("result", chain.get(1));
 
-            // Test exception re-thrown.
+        // Test exception re-thrown.
 
-            fut = new GridFutureAdapter<>();
-            chain = fut.chain(passThrough);
+        fut = new GridFutureAdapter<>();
+        chain = exec != null ? fut.chain(passThrough, exec) : fut.chain(passThrough);
 
-            fut.onDone(new ClusterGroupEmptyCheckedException("test exception"));
+        fut.onDone(new ClusterGroupEmptyCheckedException("test exception"));
 
-            try {
-                chain.get();
+        try {
+            chain.get();
 
-                fail("Expects failed with exception.");
-            }
-            catch (ClusterGroupEmptyCheckedException e) {
-                info("Expected exception: " + e.getMessage());
-            }
+            fail("Expects failed with exception.");
+        }
+        catch (ClusterGroupEmptyCheckedException e) {
+            info("Expected exception: " + e.getMessage());
+        }
 
-            // Test error re-thrown.
+        // Test error re-thrown.
 
-            fut = new GridFutureAdapter<>();
-            chain = fut.chain(passThrough);
+        fut = new GridFutureAdapter<>();
+        chain = exec != null ? fut.chain(passThrough, exec) : fut.chain(passThrough);
 
-            try {
-                fut.onDone(new StackOverflowError("test error"));
+        try {
+            fut.onDone(new StackOverflowError("test error"));
 
+            if (exec == null)
                 fail("Expects failed with error.");
-            }
-            catch (StackOverflowError e) {
-                info("Expected error: " + e.getMessage());
-            }
+        }
+        catch (StackOverflowError e) {
+            info("Expected error: " + e.getMessage());
+        }
 
-            try {
-                chain.get();
+        try {
+            chain.get();
 
-                fail("Expects failed with error.");
-            }
-            catch (StackOverflowError e) {
-                info("Expected error: " + e.getMessage());
-            }
+            fail("Expects failed with error.");
         }
-        finally {
-            ctx.stop(false);
+        catch (StackOverflowError e) {
+            info("Expected error: " + e.getMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
index 201fd27..d403784 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
@@ -114,7 +114,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
                 proceedExceptionCaught(ses, ex);
             }
 
-            @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
+            @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) {
                 sndEvt.compareAndSet(null, ses.<String>meta(MESSAGE_WRITE_META_NAME));
 
                 sndMsgObj.compareAndSet(null, msg);
@@ -155,7 +155,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
         chain.onSessionIdleTimeout(ses);
         chain.onSessionWriteTimeout(ses);
         assertNull(chain.onSessionClose(ses));
-        assertNull(chain.onSessionWrite(ses, snd));
+        assertNull(chain.onSessionWrite(ses, snd, true));
 
         assertEquals("DCBA", connectedEvt.get());
         assertEquals("DCBA", disconnectedEvt.get());
@@ -210,10 +210,10 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException {
             chainMeta(ses, MESSAGE_WRITE_META_NAME);
 
-            return proceedSessionWrite(ses, msg);
+            return proceedSessionWrite(ses, msg, fut);
         }
 
         /** {@inheritDoc} */
@@ -349,6 +349,11 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
+        @Override public void sendNoFuture(Object msg) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
         @Override public GridNioFuture<Object> resumeReads() {
             return null;
         }
@@ -369,13 +374,28 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+        @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
             // No-op.
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+        @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
             return null;
         }
+
+        /** {@inheritDoc} */
+        @Override public void systemMessage(Object msg) {
+            // No-op.
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java b/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java
index 61a13b1..25dd780 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/nio/GridNioBenchmarkClient.java
@@ -83,7 +83,7 @@ public class GridNioBenchmarkClient {
      */
     public void run() throws IOException, InterruptedException {
         for (int i = 0; i < connCnt; i++)
-            exec.submit(new ClientThread());
+            exec.execute(new ClientThread());
 
         Thread.sleep(5*60*1000);
 
@@ -167,4 +167,4 @@ public class GridNioBenchmarkClient {
             return read;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PRecursionTaskSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PRecursionTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PRecursionTaskSelfTest.java
index f21f31b..a18ef32 100644
--- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PRecursionTaskSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PRecursionTaskSelfTest.java
@@ -196,4 +196,4 @@ public class GridP2PRecursionTaskSelfTest extends GridCommonAbstractTest {
             return ignite.compute().execute(FactorialTask.class, arg);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index 652e47f..5ca8f26 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -69,7 +69,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
     private static final int commExtPort2 = 20100;
 
     /** */
-    private AddressResolver resolver;
+    private AddressResolver rslvr;
 
     /** */
     private boolean ipFinderUseLocPorts;
@@ -111,14 +111,15 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
         cfg.setConnectorConfiguration(null);
 
         TcpCommunicationSpi commSpi = new TcpCommunicationSpi() {
-            @Override protected GridCommunicationClient createTcpClient(ClusterNode node) throws IgniteCheckedException {
+            @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx)
+                throws IgniteCheckedException {
                 Map<String, Object> attrs = new HashMap<>(node.attributes());
 
                 attrs.remove(createSpiAttributeName(ATTR_PORT));
 
                 ((TcpDiscoveryNode)node).setAttributes(attrs);
 
-                return super.createTcpClient(node);
+                return super.createTcpClient(node, connIdx);
             }
         };
 
@@ -126,12 +127,13 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
         commSpi.setLocalPort(commLocPort);
         commSpi.setLocalPortRange(1);
         commSpi.setSharedMemoryPort(-1);
+        commSpi.setConnectionsPerNode(1);
 
         cfg.setCommunicationSpi(commSpi);
 
-        assert resolver != null;
+        assert rslvr != null;
 
-        cfg.setAddressResolver(resolver);
+        cfg.setAddressResolver(rslvr);
 
         return cfg;
     }
@@ -147,7 +149,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
         map.put(new InetSocketAddress("127.0.0.1", locPort2), F.asList(new InetSocketAddress("127.0.0.1", extPort2)));
         map.put(new InetSocketAddress("127.0.0.1", commLocPort2), F.asList(new InetSocketAddress("127.0.0.1", commExtPort2)));
 
-        resolver = new AddressResolver() {
+        rslvr = new AddressResolver() {
             @Override public Collection<InetSocketAddress> getExternalAddresses(InetSocketAddress addr) {
                 return map.get(addr);
             }
@@ -167,7 +169,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
         map.put("127.0.0.1:" + locPort2, "127.0.0.1:" + extPort2);
         map.put("127.0.0.1:" + commLocPort2, "127.0.0.1:" + commExtPort2);
 
-        resolver = new BasicAddressResolver(map);
+        rslvr = new BasicAddressResolver(map);
 
         doTestForward();
     }
@@ -180,7 +182,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest {
 
         map.put("127.0.0.1", "127.0.0.1");
 
-        resolver = new BasicAddressResolver(map);
+        rslvr = new BasicAddressResolver(map);
 
         ipFinderUseLocPorts = true;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index 076724d..3c4fea0 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -90,16 +90,36 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
         super.afterTest();
 
         for (CommunicationSpi spi : spis.values()) {
-            ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
+            ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(spi, "clients");
+
+            for (int i = 0; i < 20; i++) {
+                GridCommunicationClient client0 = null;
+
+                for (GridCommunicationClient[] clients0 : clients.values()) {
+                    for (GridCommunicationClient client : clients0) {
+                        if (client != null) {
+                            client0 = client;
+
+                            break;
+                        }
+                    }
+
+                    if (client0 != null)
+                        break;
+                }
+
+                if (client0 == null)
+                    return;
 
-            for (int i = 0; i < 20 && !clients.isEmpty(); i++) {
                 info("Check failed for SPI [grid=" +
-                    GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") + ", spi=" + spi + ']');
+                    GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "gridName") +
+                    ", client=" + client0 +
+                    ", spi=" + spi + ']');
 
                 U.sleep(1000);
             }
 
-            assert clients.isEmpty() : "Clients: " + clients;
+            fail("Failed to wait when clients are closed.");
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 8635d94..a649130 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -83,6 +84,12 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
     /** Use ssl. */
     protected boolean useSsl;
 
+    /** */
+    private int connectionsPerNode = 1;
+
+    /** */
+    private boolean pairedConnections = true;
+
     /**
      *
      */
@@ -163,6 +170,34 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
     /**
      * @throws Exception If failed.
      */
+    public void testMultithreaded_10Connections() throws Exception {
+        connectionsPerNode = 10;
+
+        testMultithreaded();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreaded_NoPairedConnections() throws Exception {
+        pairedConnections = false;
+
+        testMultithreaded();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultithreaded_10ConnectionsNoPaired() throws Exception {
+        pairedConnections = false;
+        connectionsPerNode = 10;
+
+        testMultithreaded();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testWithLoad() throws Exception {
         int threads = Runtime.getRuntime().availableProcessors() * 5;
 
@@ -244,7 +279,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
                 final AtomicInteger idx = new AtomicInteger();
 
                 try {
-                    GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                    final Callable<Void> c = new Callable<Void>() {
                         @Override public Void call() throws Exception {
                             int idx0 = idx.getAndIncrement();
 
@@ -270,7 +305,40 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
 
                             return null;
                         }
-                    }, threads, "test");
+                    };
+
+                    List<Thread> threadsList = new ArrayList<>();
+
+                    final AtomicBoolean fail = new AtomicBoolean();
+
+                    final AtomicLong tId = new AtomicLong();
+
+                    for (int t = 0; t < threads; t++) {
+                        Thread t0 = new Thread(new Runnable() {
+                            @Override public void run() {
+                                try {
+                                    c.call();
+                                }
+                                catch (Throwable e) {
+                                    log.error("Unexpected error: " + e, e);
+
+                                    fail.set(true);
+                                }
+                            }
+                        }) {
+                            @Override public long getId() {
+                                // Override getId to use all connections.
+                                return tId.getAndIncrement();
+                            }
+                        };
+
+                        threadsList.add(t0);
+
+                        t0.start();
+                    }
+
+                    for (Thread t0 : threadsList)
+                        t0.join();
 
                     assertTrue(latch.await(10, TimeUnit.SECONDS));
 
@@ -281,17 +349,19 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
 
                         final GridNioServer srv = U.field(spi, "nioSrvr");
 
+                        final int conns = pairedConnections ? 2 : 1;
+
                         GridTestUtils.waitForCondition(new GridAbsPredicate() {
                             @Override public boolean apply() {
                                 Collection sessions = U.field(srv, "sessions");
 
-                                return sessions.size() == 1;
+                                return sessions.size() == conns * connectionsPerNode;
                             }
                         }, 5000);
 
                         Collection sessions = U.field(srv, "sessions");
 
-                        assertEquals(1, sessions.size());
+                        assertEquals(conns * connectionsPerNode, sessions.size());
                     }
 
                     assertEquals(expMsgs, lsnr.cntr.get());
@@ -320,6 +390,8 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
         spi.setIdleConnectionTimeout(60_000);
         spi.setConnectTimeout(10_000);
         spi.setSharedMemoryPort(-1);
+        spi.setConnectionsPerNode(connectionsPerNode);
+        spi.setUsePairedConnections(pairedConnections);
 
         return spi;
     }
@@ -434,4 +506,4 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
             rsrcs.stopThreads();
     }
 
-}
\ No newline at end of file
+}


Mime
View raw message