ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [05/50] [abbrv] ignite git commit: For communication spi disabled pairedConnections by default, implemented NIO sessions balancing for this mode.
Date Tue, 17 Jan 2017 14:04:41 GMT
For communication spi disabled pairedConnections by default, implemented NIO sessions balancing
for this mode.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/da5b68cc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/da5b68cc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/da5b68cc

Branch: refs/heads/ignite-gg-11810-1
Commit: da5b68cc89ba6eeff25beb66e3a4d8c2b9871ab3
Parents: 864a95e
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Dec 29 15:46:59 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Dec 29 15:46:59 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/nio/GridNioServer.java | 159 ++++++++++++++++---
 .../communication/tcp/TcpCommunicationSpi.java  |  20 +--
 .../tcp/TcpCommunicationSpiMBean.java           |   5 +-
 ...mmunicationBalancePairedConnectionsTest.java |  28 ++++
 .../IgniteCommunicationBalanceTest.java         |  25 ++-
 ...cMessageRecoveryNoPairedConnectionsTest.java |  47 ------
 ...micMessageRecoveryPairedConnectionsTest.java |  47 ++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   6 +-
 8 files changed, 250 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index bc1f173..a59adba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -227,6 +227,9 @@ public class GridNioServer<T> {
     /** */
     private final IgniteRunnable balancer;
 
+    /** */
+    private final boolean readWriteSelectorsAssign;
+
     /**
      * @param addr Address.
      * @param port Port.
@@ -250,7 +253,7 @@ public class GridNioServer<T> {
      * @param writerFactory Writer factory.
      * @param skipRecoveryPred Skip recovery predicate.
      * @param msgQueueLsnr Message queue size listener.
-     * @param balancing NIO sessions balancing flag.
+     * @param readWriteSelectorsAssign If {@code true} then in/out connections are assigned
to even/odd workers.
      * @param filters Filters for this server.
      * @throws IgniteCheckedException If failed.
      */
@@ -275,7 +278,7 @@ public class GridNioServer<T> {
         GridNioMessageWriterFactory writerFactory,
         IgnitePredicate<Message> skipRecoveryPred,
         IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr,
-        boolean balancing,
+        boolean readWriteSelectorsAssign,
         GridNioFilter... filters
     ) throws IgniteCheckedException {
         if (port != -1)
@@ -300,6 +303,7 @@ public class GridNioServer<T> {
         this.sndQueueLimit = sndQueueLimit;
         this.msgQueueLsnr = msgQueueLsnr;
         this.selectorSpins = selectorSpins;
+        this.readWriteSelectorsAssign = readWriteSelectorsAssign;
 
         filterChain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
 
@@ -359,10 +363,16 @@ public class GridNioServer<T> {
 
         IgniteRunnable balancer0 = null;
 
-        if (balancing && balancePeriod > 0) {
+        if (balancePeriod > 0) {
             boolean rndBalance = IgniteSystemProperties.getBoolean(IGNITE_IO_BALANCE_RANDOM_BALANCE,
false);
 
-            balancer0 = rndBalance ? new RandomBalancer() : new SizeBasedBalancer(balancePeriod);
+            if (rndBalance)
+                balancer0 = new RandomBalancer();
+            else {
+                balancer0 = readWriteSelectorsAssign ?
+                    new ReadWriteSizeBasedBalancer(balancePeriod) :
+                    new SizeBasedBalancer(balancePeriod);
+            }
         }
 
         this.balancer = balancer0;
@@ -823,21 +833,31 @@ public class GridNioServer<T> {
         int balanceIdx;
 
         if (workers > 1) {
-            if (req.accepted()) {
-                balanceIdx = readBalanceIdx;
+            if (readWriteSelectorsAssign) {
+                if (req.accepted()) {
+                    balanceIdx = readBalanceIdx;
 
-                readBalanceIdx += 2;
+                    readBalanceIdx += 2;
 
-                if (readBalanceIdx >= workers)
-                    readBalanceIdx = 0;
+                    if (readBalanceIdx >= workers)
+                        readBalanceIdx = 0;
+                }
+                else {
+                    balanceIdx = writeBalanceIdx;
+
+                    writeBalanceIdx += 2;
+
+                    if (writeBalanceIdx >= workers)
+                        writeBalanceIdx = 1;
+                }
             }
             else {
-                balanceIdx = writeBalanceIdx;
+                balanceIdx = readBalanceIdx;
 
-                writeBalanceIdx += 2;
+                readBalanceIdx++;
 
-                if (writeBalanceIdx >= workers)
-                    writeBalanceIdx = 1;
+                if (readBalanceIdx >= workers)
+                    readBalanceIdx = 0;
             }
         }
         else
@@ -3124,8 +3144,8 @@ public class GridNioServer<T> {
         /** */
         private long selectorSpins;
 
-        /** NIO sessions balancing flag. */
-        private boolean balancing;
+        /** */
+        private boolean readWriteSelectorsAssign;
 
         /**
          * Finishes building the instance.
@@ -3155,7 +3175,7 @@ public class GridNioServer<T> {
                 writerFactory,
                 skipRecoveryPred,
                 msgQueueLsnr,
-                balancing,
+                readWriteSelectorsAssign,
                 filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS
             );
 
@@ -3169,11 +3189,11 @@ public class GridNioServer<T> {
         }
 
         /**
-         * @param balancing NIO sessions balancing flag.
+         * @param readWriteSelectorsAssign {@code True} to assign in/out connections even/odd
workers.
          * @return This for chaining.
          */
-        public Builder<T> balancing(boolean balancing) {
-            this.balancing = balancing;
+        public Builder<T> readWriteSelectorsAssign(boolean readWriteSelectorsAssign)
{
+            this.readWriteSelectorsAssign = readWriteSelectorsAssign;
 
             return this;
         }
@@ -3415,7 +3435,7 @@ public class GridNioServer<T> {
     /**
      *
      */
-    private class SizeBasedBalancer implements IgniteRunnable {
+    private class ReadWriteSizeBasedBalancer implements IgniteRunnable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -3428,7 +3448,7 @@ public class GridNioServer<T> {
         /**
          * @param balancePeriod Period.
          */
-        SizeBasedBalancer(long balancePeriod) {
+        ReadWriteSizeBasedBalancer(long balancePeriod) {
             this.balancePeriod = balancePeriod;
         }
 
@@ -3559,6 +3579,100 @@ public class GridNioServer<T> {
     }
 
     /**
+     *
+     */
+    private class SizeBasedBalancer implements IgniteRunnable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private long lastBalance;
+
+        /** */
+        private final long balancePeriod;
+
+        /**
+         * @param balancePeriod Period.
+         */
+        SizeBasedBalancer(long balancePeriod) {
+            this.balancePeriod = balancePeriod;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            long now = U.currentTimeMillis();
+
+            if (lastBalance + balancePeriod < now) {
+                lastBalance = now;
+
+                long maxBytes0 = -1, minBytes0 = -1;
+                int maxBytesIdx = -1, minBytesIdx = -1;
+
+                for (int i = 0; i < clientWorkers.size(); i++) {
+                    GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
+
+                    int sesCnt = worker.workerSessions.size();
+
+                    long bytes0 = worker.bytesRcvd0 + worker.bytesSent0;
+
+                    if ((maxBytes0 == -1 || bytes0 > maxBytes0) && bytes0 >
0 && sesCnt > 1) {
+                        maxBytes0 = bytes0;
+                        maxBytesIdx = i;
+                    }
+
+                    if (minBytes0 == -1 || bytes0 < minBytes0) {
+                        minBytes0 = bytes0;
+                        minBytesIdx = i;
+                    }
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Balancing data [min0=" + minBytes0 + ", minIdx=" + minBytesIdx
+
+                        ", max0=" + maxBytes0 + ", maxIdx=" + maxBytesIdx + ']');
+
+                if (maxBytes0 != -1 && minBytes0 != -1) {
+                    GridSelectorNioSessionImpl ses = null;
+
+                    long bytesDiff = maxBytes0 - minBytes0;
+                    long delta = bytesDiff;
+                    double threshold = bytesDiff * 0.9;
+
+                    GridConcurrentHashSet<GridSelectorNioSessionImpl> sessions =
+                        clientWorkers.get(maxBytesIdx).workerSessions;
+
+                    for (GridSelectorNioSessionImpl ses0 : sessions) {
+                        long bytesSent0 = ses0.bytesSent0();
+
+                        if (bytesSent0 < threshold &&
+                            (ses == null || delta > U.safeAbs(bytesSent0 - bytesDiff /
2))) {
+                            ses = ses0;
+                            delta = U.safeAbs(bytesSent0 - bytesDiff / 2);
+                        }
+                    }
+
+                    if (ses != null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Will move session to less loaded worker [ses=" + ses
+
+                                ", from=" + maxBytesIdx + ", to=" + minBytesIdx + ']');
+
+                        moveSession(ses, maxBytesIdx, minBytesIdx);
+                    }
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Unable to find session to move.");
+                    }
+                }
+
+                for (int i = 0; i < clientWorkers.size(); i++) {
+                    GridNioServer.AbstractNioClientWorker worker = clientWorkers.get(i);
+
+                    worker.reset0();
+                }
+            }
+        }
+    }
+
+    /**
      * For tests only.
      */
     @SuppressWarnings("unchecked")
@@ -3625,6 +3739,9 @@ public class GridNioServer<T> {
      *
      */
     interface SessionChangeRequest {
+        /**
+         * @return Session.
+         */
         GridNioSession session();
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index c35b5ef..ae0e6f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -293,7 +293,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /**
      * Default count of selectors for TCP server equals to
-     * {@code "Math.min(8, Runtime.getRuntime().availableProcessors())"}.
+     * {@code "Math.max(4, Runtime.getRuntime().availableProcessors() / 2)"}.
      */
     public static final int DFLT_SELECTORS_CNT = Math.max(4, Runtime.getRuntime().availableProcessors()
/ 2);
 
@@ -979,7 +979,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     private IpcSharedMemoryServerEndpoint shmemSrv;
 
     /** */
-    private boolean usePairedConnections = true;
+    private boolean usePairedConnections;
 
     /** */
     private int connectionsPerNode = DFLT_CONN_PER_NODE;
@@ -1193,10 +1193,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * Set this to {@code false} if each connection of {@link #getConnectionsPerNode()}
      * should be used for outgoing and incoming messages. In this case total number
      * of connections between local and each remote node is {@link #getConnectionsPerNode()}.
-     * In this case load NIO selectors load
-     * balancing of {@link GridNioServer} will be disabled.
      * <p>
-     * Default is {@code true}.
+     * Default is {@code false}.
      *
      * @param usePairedConnections {@code true} to use paired connections and {@code false}
otherwise.
      * @see #getConnectionsPerNode()
@@ -2057,16 +2055,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .writerFactory(writerFactory)
                         .skipRecoveryPredicate(skipRecoveryPred)
                         .messageQueueSizeListener(queueSizeMonitor)
-                        .balancing(usePairedConnections) // Current balancing logic assumes
separate in/out connections.
+                        .readWriteSelectorsAssign(usePairedConnections)
                         .build();
 
                 boundTcpPort = port;
 
                 // Ack Port the TCP server was bound to.
-                if (log.isInfoEnabled())
+                if (log.isInfoEnabled()) {
                     log.info("Successfully bound communication NIO server to TCP port " +
-                        "[port=" + boundTcpPort + ", locHost=" + locHost + ", selectorsCnt="
+ selectorsCnt +
-                        ", selectorSpins=" + srvr.selectorSpins() + ']');
+                        "[port=" + boundTcpPort +
+                        ", locHost=" + locHost +
+                        ", selectorsCnt=" + selectorsCnt +
+                        ", selectorSpins=" + srvr.selectorSpins() +
+                        ", pairedConn=" + usePairedConnections + ']');
+                }
 
                 srvr.idleTimeout(idleConnTimeout);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index c7a1a53..c56e18c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -51,10 +51,9 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean
{
      * is {@link #getConnectionsPerNode()} * 2.
      * <p>
      * Returns {@code false} if each connection of {@link #getConnectionsPerNode()}
-     * should be used for outgoing and incoming messages. In this case load NIO selectors
load
-     * balancing of {@link GridNioServer} will be disabled.
+     * should be used for outgoing and incoming messages.
      * <p>
-     * Default is {@code true}.
+     * Default is {@code false}.
      *
      * @return {@code true} to use paired connections and {@code false} otherwise.
      * @see #getConnectionsPerNode()

http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalancePairedConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalancePairedConnectionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalancePairedConnectionsTest.java
new file mode 100644
index 0000000..4544030
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalancePairedConnectionsTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+/**
+ *
+ */
+public class IgniteCommunicationBalancePairedConnectionsTest extends IgniteCommunicationBalanceTest
{
+    /** {@inheritDoc} */
+    @Override protected boolean usePairedConnections() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
index e142aef..4271417 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -63,6 +63,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest
{
 
         commSpi.setSharedMemoryPort(-1);
         commSpi.setConnectionsPerNode(connectionsPerNode());
+        commSpi.setUsePairedConnections(usePairedConnections());
 
         if (selectors > 0)
             commSpi.setSelectorsCount(selectors);
@@ -75,6 +76,13 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * @return Value for {@link TcpCommunicationSpi#setUsePairedConnections(boolean)}.
+     */
+    protected boolean usePairedConnections() {
+        return false;
+    }
+
+    /**
      * @return Connections per node.
      */
     protected int connectionsPerNode() {
@@ -97,7 +105,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest
{
         try {
             selectors = 4;
 
-            final int SRVS = 4;
+            final int SRVS = 6;
 
             startGridsMultiThreaded(SRVS);
 
@@ -105,7 +113,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest
{
 
             final Ignite client = startGrid(SRVS);
 
-            for (int i = 0; i < 4; i++) {
+            for (int i = 0; i < SRVS; i++) {
                 ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
 
                 client.compute(client.cluster().forNode(node)).call(new DummyCallable(null));
@@ -151,7 +159,10 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest
{
                             }
                         }
 
-                        return srv.readerMoveCount() > readMoveCnt && srv.writerMoveCount()
> writeMoveCnt;
+                        if (usePairedConnections())
+                            return srv.readerMoveCount() > readMoveCnt && srv.writerMoveCount()
> writeMoveCnt;
+                        else
+                            return srv.readerMoveCount() > readMoveCnt || srv.writerMoveCount()
> writeMoveCnt;
                     }
                 }, 30_000);
 
@@ -165,8 +176,12 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest
{
                     ", rc2=" + readMoveCnt2 +
                     ", wc2=" + writeMoveCnt2 + ']');
 
-                assertTrue(readMoveCnt2 > readMoveCnt1);
-                assertTrue(writeMoveCnt2 > writeMoveCnt1);
+                if (usePairedConnections()) {
+                    assertTrue(readMoveCnt2 > readMoveCnt1);
+                    assertTrue(writeMoveCnt2 > writeMoveCnt1);
+                }
+                else
+                    assertTrue(readMoveCnt2 > readMoveCnt1 || writeMoveCnt2 > writeMoveCnt1);
 
                 readMoveCnt1 = readMoveCnt2;
                 writeMoveCnt1 = writeMoveCnt2;

http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
deleted file mode 100644
index 71772ef..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-
-/**
- *
- */
-public class IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest extends IgniteCacheAtomicMessageRecoveryTest
{
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpCommunicationSpi commSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
-
-        assertTrue(commSpi.isUsePairedConnections());
-
-        commSpi.setUsePairedConnections(false);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected CacheAtomicityMode atomicityMode() {
-        return ATOMIC;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryPairedConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryPairedConnectionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryPairedConnectionsTest.java
new file mode 100644
index 0000000..dffb966
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecoveryPairedConnectionsTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicMessageRecoveryPairedConnectionsTest extends IgniteCacheAtomicMessageRecoveryTest
{
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpCommunicationSpi commSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
+
+        assertFalse(commSpi.isUsePairedConnections());
+
+        commSpi.setUsePairedConnections(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/da5b68cc/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 1e73e79..092d95e 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest;
 import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest;
 import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest;
 import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceMultipleConnectionsTest;
+import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalancePairedConnectionsTest;
 import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest;
 import org.apache.ignite.internal.managers.communication.IgniteIoTestMessagesTest;
 import org.apache.ignite.internal.managers.communication.IgniteVariousConnectionNumberTest;
@@ -134,7 +135,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicNearUp
 import org.apache.ignite.internal.processors.cache.distributed.CacheTxNearUpdateTopologyChangeTest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheEntrySetIterationPreloadingSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecovery10ConnectionsTest;
-import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryPairedConnectionsTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheConnectionRecovery10ConnectionsTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheConnectionRecoveryTest;
@@ -301,7 +302,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheEntrySetIterationPreloadingSelfTest.class);
         suite.addTestSuite(GridCacheMixedPartitionExchangeSelfTest.class);
         suite.addTestSuite(IgniteCacheAtomicMessageRecoveryTest.class);
-        suite.addTestSuite(IgniteCacheAtomicMessageRecoveryNoPairedConnectionsTest.class);
+        suite.addTestSuite(IgniteCacheAtomicMessageRecoveryPairedConnectionsTest.class);
         suite.addTestSuite(IgniteCacheAtomicMessageRecovery10ConnectionsTest.class);
         suite.addTestSuite(IgniteCacheTxMessageRecoveryTest.class);
         suite.addTestSuite(IgniteCacheMessageWriteTimeoutTest.class);
@@ -339,6 +340,7 @@ public class IgniteCacheTestSuite extends TestSuite {
 
         suite.addTestSuite(IgniteVariousConnectionNumberTest.class);
         suite.addTestSuite(IgniteCommunicationBalanceTest.class);
+        suite.addTestSuite(IgniteCommunicationBalancePairedConnectionsTest.class);
         suite.addTestSuite(IgniteCommunicationBalanceMultipleConnectionsTest.class);
         suite.addTestSuite(IgniteIoTestMessagesTest.class);
 


Mime
View raw message