ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/2] ignite git commit: Added test.
Date Fri, 21 Apr 2017 14:30:45 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2.0 ad7e4a092 -> 6e2c51d34


Added test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a900dc68
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a900dc68
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a900dc68

Branch: refs/heads/ignite-2.0
Commit: a900dc685a24b21ac155796ecb027c60608b88f5
Parents: ca8ad03
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Apr 21 17:30:23 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Apr 21 17:30:23 2017 +0300

----------------------------------------------------------------------
 .../internal/TestRecordingCommunicationSpi.java |  29 ++-
 .../cache/IgniteOnePhaseCommitInvokeTest.java   |  10 +-
 .../CacheLateAffinityAssignmentTest.java        |  31 ++-
 ...heClientMultiNodeUpdateTopologyLockTest.java | 193 +++++++++++++++++++
 .../IgniteCacheReadFromBackupTest.java          |  15 +-
 .../IgniteTxCachePrimarySyncTest.java           |  17 +-
 .../dht/IgniteCacheTxRecoveryRollbackTest.java  |  17 +-
 .../atomic/IgniteCacheAtomicProtocolTest.java   | 183 ++----------------
 .../ignite/testframework/GridTestNode.java      |   7 +
 .../junits/common/GridCommonAbstractTest.java   |  76 +++++++-
 10 files changed, 353 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index aa0cc09..98d2553 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -59,7 +58,7 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
     private Map<Class<?>, Set<String>> blockCls = new HashMap<>();
 
     /** */
-    private IgnitePredicate<GridIoMessage> blockP;
+    private IgniteBiPredicate<ClusterNode, Message> blockP;
 
     /**
      * @param node Node.
@@ -75,16 +74,18 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi
{
         if (msg instanceof GridIoMessage) {
             GridIoMessage ioMsg = (GridIoMessage)msg;
 
-            Object msg0 = ioMsg.message();
+            Message msg0 = ioMsg.message();
 
             synchronized (this) {
-                if ((recordClasses != null && recordClasses.contains(msg0.getClass()))
||
-                    (recordP != null && recordP.apply(node, msg)))
+                boolean record = (recordClasses != null && recordClasses.contains(msg0.getClass()))
||
+                    (recordP != null && recordP.apply(node, msg0));
+
+                if (record)
                     recordedMsgs.add(msg0);
 
                 boolean block = false;
 
-                if (blockP != null && blockP.apply(ioMsg))
+                if (blockP != null && blockP.apply(node, msg0))
                     block = true;
                 else {
                     Set<String> blockNodes = blockCls.get(msg0.getClass());
@@ -106,6 +107,8 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi
{
 
                     return;
                 }
+                else if (record)
+                    notifyAll();
             }
         }
 
@@ -166,7 +169,7 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi
{
      * @param nodeName Node name.
      * @throws InterruptedException If interrupted.
      */
-    public void waitForMessage(Class<?> cls, String nodeName) throws InterruptedException
{
+    public void waitForBlocked(Class<?> cls, String nodeName) throws InterruptedException
{
         synchronized (this) {
             while (!hasMessage(cls, nodeName))
                 wait();
@@ -174,6 +177,16 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi
{
     }
 
     /**
+     * @throws InterruptedException If interrupted.
+     */
+    public void waitForRecorded() throws InterruptedException {
+        synchronized (this) {
+            while (recordedMsgs.isEmpty())
+                wait();
+        }
+    }
+
+    /**
      * @param cls Message class.
      * @param nodeName Node name.
      * @return {@code True} if has blocked message.
@@ -191,7 +204,7 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi
{
     /**
      * @param blockP Message block predicate.
      */
-    public void blockMessages(IgnitePredicate<GridIoMessage> blockP) {
+    public void blockMessages(IgniteBiPredicate<ClusterNode, Message> blockP) {
         synchronized (this) {
             this.blockP = blockP;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java
index 601c067..a5cb3f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java
@@ -21,17 +21,17 @@ import java.util.concurrent.Callable;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -127,10 +127,8 @@ public class IgniteOnePhaseCommitInvokeTest extends GridCommonAbstractTest
{
 
         final Ignite clientNode = startGrid(1);
 
-        TestRecordingCommunicationSpi.spi(srv0).blockMessages(new IgnitePredicate<GridIoMessage>()
{
-            @Override public boolean apply(GridIoMessage msg0) {
-                Message msg = msg0.message();
-
+        TestRecordingCommunicationSpi.spi(srv0).blockMessages(new IgniteBiPredicate<ClusterNode,
Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
                 return msg instanceof GridDhtPartitionSupplyMessage &&
                     ((GridDhtPartitionSupplyMessage)msg).cacheId() == CU.cacheId(CACHE_NAME);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index a74117c..c68c8d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -1082,12 +1083,10 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest
{
             TestRecordingCommunicationSpi spi =
                     (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
 
-            spi.blockMessages(new IgnitePredicate<GridIoMessage>() {
-                @Override public boolean apply(GridIoMessage msg) {
-                    Message msg0 = msg.message();
-
-                    return msg0.getClass().equals(GridDhtPartitionsSingleMessage.class) ||
-                        msg0.getClass().equals(GridDhtPartitionsFullMessage.class);
+            spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+                @Override public boolean apply(ClusterNode node, Message msg) {
+                    return msg.getClass().equals(GridDhtPartitionsSingleMessage.class) ||
+                        msg.getClass().equals(GridDhtPartitionsFullMessage.class);
                 }
             });
         }
@@ -1710,14 +1709,12 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest
{
             @Override public TestRecordingCommunicationSpi apply(String s) {
                 TestRecordingCommunicationSpi spi = new TestRecordingCommunicationSpi();
 
-                spi.blockMessages(new IgnitePredicate<GridIoMessage>() {
-                    @Override public boolean apply(GridIoMessage msg) {
-                        Message msg0 = msg.message();
-
-                        if (msg0 instanceof GridDhtForceKeysRequest || msg0 instanceof GridDhtForceKeysResponse)
{
+                spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+                    @Override public boolean apply(ClusterNode node, Message msg) {
+                        if (msg instanceof GridDhtForceKeysRequest || msg instanceof GridDhtForceKeysResponse)
{
                             fail.set(true);
 
-                            U.dumpStack(log, "Unexpected message: " + msg0);
+                            U.dumpStack(log, "Unexpected message: " + msg);
                         }
 
                         return false;
@@ -2011,14 +2008,12 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest
{
      * @param cacheName Cache name.
      */
     private void blockSupplySend(TestRecordingCommunicationSpi spi, final String cacheName)
{
-        spi.blockMessages(new IgnitePredicate<GridIoMessage>() {
-            @Override public boolean apply(GridIoMessage ioMsg) {
-                if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessage.class))
+        spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                if (!msg.getClass().equals(GridDhtPartitionSupplyMessage.class))
                     return false;
 
-                GridDhtPartitionSupplyMessage msg = (GridDhtPartitionSupplyMessage)ioMsg.message();
-
-                return msg.cacheId() == CU.cacheId(cacheName);
+                return ((GridDhtPartitionSupplyMessage)msg).cacheId() == CU.cacheId(cacheName);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientMultiNodeUpdateTopologyLockTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientMultiNodeUpdateTopologyLockTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientMultiNodeUpdateTopologyLockTest.java
new file mode 100644
index 0000000..4adf5f4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientMultiNodeUpdateTopologyLockTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class IgniteCacheClientMultiNodeUpdateTopologyLockTest extends GridCommonAbstractTest
{
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String TEST_CACHE = "testCache";
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setConsistentId(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+        cfg.setCommunicationSpi(commSpi);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticTx() throws Exception {
+        startGrids(3);
+
+        client = true;
+
+        Ignite clientNode = startGrid(3);
+
+        client = false;
+
+        IgniteCache<Integer, Integer> cache = clientNode.createCache(cacheConfiguration(0,
FULL_SYNC));
+
+        awaitPartitionMapExchange();
+
+        Integer key1 = movingKeysAfterJoin(ignite(1), TEST_CACHE, 1).get(0);
+        Integer key2 = movingKeysAfterJoin(ignite(2), TEST_CACHE, 1).get(0);
+
+        log.info("Start tx [key1=" + key1 + ", key2=" + key2 + ']');
+
+        IgniteInternalFuture<?> startFut;
+
+        TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(ignite(2));
+
+        TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(clientNode);
+
+        final UUID node0Id = ignite(0).cluster().localNode().id();
+        final UUID node2Id = ignite(2).cluster().localNode().id();
+
+        spi2.record(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                if (!node0Id.equals(node.id()))
+                    return false;
+
+                return (msg instanceof GridDhtPartitionsSingleMessage) &&
+                    ((GridDhtPartitionsSingleMessage)msg).exchangeId() != null;
+            }
+        });
+
+        clientSpi.record(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                if (!node2Id.equals(node.id()))
+                    return false;
+
+                if (msg instanceof GridNearTxFinishRequest) {
+                    log.info("Delay message [msg=" + msg + ']');
+
+                    try {
+                        Thread.sleep(5000);
+                    }
+                    catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+
+                    log.info("Send delayed message [msg=" + msg + ']');
+                }
+
+                return false;
+            }
+        });
+
+        try (Transaction tx = clientNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ))
{
+            cache.put(key1, 1);
+
+            startFut = GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    startGrid(4);
+
+                    return null;
+                }
+            }, "start-thread");
+
+            spi2.waitForRecorded();
+
+            U.sleep(5);
+
+            cache.put(key2, 2);
+
+            log.info("Commit tx");
+
+            tx.commit();
+        }
+
+        assertEquals((Integer)1, cache.get(key1));
+        assertEquals((Integer)2, cache.get(key2));
+
+        startFut.get();
+
+        assertEquals((Integer)1, cache.get(key1));
+        assertEquals((Integer)2, cache.get(key2));
+
+        awaitPartitionMapExchange();
+
+        assertEquals((Integer)1, cache.get(key1));
+        assertEquals((Integer)2, cache.get(key2));
+    }
+
+    /**
+     * @param backups Number of backups.
+     * @param writeSync Cache write synchronization mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(int backups,
+        CacheWriteSynchronizationMode writeSync) {
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName(TEST_CACHE);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(writeSync);
+        ccfg.setBackups(backups);
+        ccfg.setRebalanceMode(ASYNC);
+
+        return ccfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
index 29c2af6..42de613 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
@@ -33,16 +33,17 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -195,14 +196,12 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest
{
                     TestRecordingCommunicationSpi spi =
                         (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
 
-                    spi.blockMessages(new IgnitePredicate<GridIoMessage>() {
-                        @Override public boolean apply(GridIoMessage ioMsg) {
-                            if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessage.class))
+                    spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>()
{
+                        @Override public boolean apply(ClusterNode node, Message msg) {
+                            if (!msg.getClass().equals(GridDhtPartitionSupplyMessage.class))
                                 return false;
 
-                            GridDhtPartitionSupplyMessage msg = (GridDhtPartitionSupplyMessage)ioMsg.message();
-
-                            return msg.cacheId() == CU.cacheId(ccfg.getName());
+                            return ((GridDhtPartitionSupplyMessage)msg).cacheId() == CU.cacheId(ccfg.getName());
                         }
                     });
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
index 8a1d4a7..91e6cf6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
@@ -29,7 +29,6 @@ import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.Affinity;
@@ -41,7 +40,6 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
@@ -50,10 +48,11 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.internal.util.lang.IgnitePredicateX;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -213,9 +212,9 @@ public class IgniteTxCachePrimarySyncTest extends GridCommonAbstractTest
{
 
         commSpi0.record(GridDhtTxFinishRequest.class);
 
-        commSpi0.blockMessages(new IgnitePredicateX<GridIoMessage>() {
-            @Override public boolean applyx(GridIoMessage e) throws IgniteCheckedException
{
-                return e.message() instanceof GridDhtTxFinishRequest;
+        commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                return msg instanceof GridDhtTxFinishRequest;
             }
         });
 
@@ -466,9 +465,9 @@ public class IgniteTxCachePrimarySyncTest extends GridCommonAbstractTest
{
 
         commSpi0.record(GridDhtTxFinishRequest.class);
 
-        commSpi0.blockMessages(new IgnitePredicateX<GridIoMessage>() {
-            @Override public boolean applyx(GridIoMessage e) throws IgniteCheckedException
{
-                return e.message() instanceof GridDhtTxFinishRequest;
+        commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                return msg instanceof GridDhtTxFinishRequest;
             }
         });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
index cfe9029..7e7d341 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
@@ -32,13 +32,13 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -46,7 +46,8 @@ import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -189,7 +190,7 @@ public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest
{
 
         assertFalse(fut.isDone());
 
-        testSpi(client2).waitForMessage(GridNearTxFinishRequest.class, srv0.name());
+        testSpi(client2).waitForBlocked(GridNearTxFinishRequest.class, srv0.name());
 
         stopGrid(client2.name());
 
@@ -264,9 +265,9 @@ public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest
{
 
         testSpi(client2).blockMessages(GridNearTxFinishRequest.class, srv0.name());
 
-        testSpi(srv0).blockMessages(new IgnitePredicate<GridIoMessage>() {
-            @Override public boolean apply(GridIoMessage msg) {
-                return msg.message() instanceof GridDhtTxFinishRequest;
+        testSpi(srv0).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                return msg instanceof GridDhtTxFinishRequest;
             }
         });
 
@@ -292,7 +293,7 @@ public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest
{
 
         assertFalse(fut.isDone());
 
-        testSpi(client2).waitForMessage(GridNearTxFinishRequest.class, srv0.name());
+        testSpi(client2).waitForBlocked(GridNearTxFinishRequest.class, srv0.name());
 
         stopGrid(client2.name());
         stopGrid(srv0.name());
@@ -397,7 +398,7 @@ public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest
{
 
         assertFalse(fut.isDone());
 
-        testSpi(srv0).waitForMessage(GridNearTxPrepareResponse.class, client.name());
+        testSpi(srv0).waitForBlocked(GridNearTxPrepareResponse.class, client.name());
 
         stopGrid(client.name());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 5a6b1c8..591858a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -17,14 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
@@ -32,18 +29,11 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cache.affinity.AffinityFunction;
-import org.apache.ignite.cache.affinity.AffinityFunctionContext;
-import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -51,15 +41,14 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
@@ -112,12 +101,10 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest
{
      */
     private void blockRebalance() {
         for (Ignite node : G.allGrids()) {
-            testSpi(node).blockMessages(new IgnitePredicate<GridIoMessage>() {
-                @Override public boolean apply(GridIoMessage msg) {
-                    Object msg0 = msg.message();
-
-                    return (msg0 instanceof GridDhtPartitionSupplyMessage)
-                        && ((GridCacheMessage)msg0).cacheId() == CU.cacheId(TEST_CACHE);
+            testSpi(node).blockMessages(new IgniteBiPredicate<ClusterNode, Message>()
{
+                @Override public boolean apply(ClusterNode node, Message msg) {
+                    return (msg instanceof GridDhtPartitionSupplyMessage)
+                        && ((GridCacheMessage)msg).cacheId() == CU.cacheId(TEST_CACHE);
                 }
             });
         }
@@ -368,7 +355,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest
{
 
         final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1,
FULL_ASYNC));
 
-        List<Integer> keys = getKeysMoved(srv0, TEST_CACHE, putAll ? 3 : 1);
+        List<Integer> keys = movingKeysAfterJoin(srv0, TEST_CACHE, putAll ? 10 : 1);
 
         testSpi(clientNode).blockMessages(GridNearAtomicSingleUpdateRequest.class, srv0.name());
         testSpi(clientNode).blockMessages(GridNearAtomicFullUpdateRequest.class, srv0.name());
@@ -663,9 +650,9 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest
{
 
         IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE);
 
-        testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() {
-            @Override public boolean apply(GridIoMessage msg) {
-                return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest;
+        testSpi(ignite(0)).blockMessages(new IgniteBiPredicate<ClusterNode, Message>()
{
+            @Override public boolean apply(ClusterNode node, Message msg) {
+                return msg instanceof GridDhtAtomicAbstractUpdateRequest;
             }
         });
 
@@ -719,16 +706,16 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest
{
         IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE);
 
         if (fail0) {
-            testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() {
-                @Override public boolean apply(GridIoMessage msg) {
-                    return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest;
+            testSpi(ignite(0)).blockMessages(new IgniteBiPredicate<ClusterNode, Message>()
{
+                @Override public boolean apply(ClusterNode node, Message msg) {
+                    return msg instanceof GridDhtAtomicAbstractUpdateRequest;
                 }
             });
         }
         if (fail1) {
-            testSpi(ignite(2)).blockMessages(new IgnitePredicate<GridIoMessage>() {
-                @Override public boolean apply(GridIoMessage msg) {
-                    return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest;
+            testSpi(ignite(2)).blockMessages(new IgniteBiPredicate<ClusterNode, Message>()
{
+                @Override public boolean apply(ClusterNode node, Message msg) {
+                    return msg instanceof GridDhtAtomicAbstractUpdateRequest;
                 }
             });
         }
@@ -825,68 +812,6 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest
{
     }
 
     /**
-     * Return list of keys that are primary for given node on given topology,
-     * but will not be primary after add one new node.
-     *
-     * @param ign Ignite.
-     * @param cacheName Cache name.
-     * @param size Number of keys.
-     * @return List of keys.
-     */
-    private List<Integer> getKeysMoved(Ignite ign, String cacheName, int size) {
-        GridCacheContext<Object, Object> cctx = ((IgniteKernal)ign).context().cache().internalCache(cacheName).context();
-
-        ArrayList<ClusterNode> nodes = new ArrayList<>(ign.cluster().nodes());
-
-        AffinityFunction func = cctx.config().getAffinity();
-
-        AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(
-            nodes,
-            null,
-            null,
-            new AffinityTopologyVersion(1, 0),
-            cctx.config().getBackups());
-
-        List<List<ClusterNode>> calcAff = func.assignPartitions(ctx);
-
-        String name = getTestIgniteInstanceName(nodes.size());
-
-        nodes.add(new FakeNode(name));
-
-        ctx = new GridAffinityFunctionContextImpl(
-            nodes,
-            null,
-            null,
-            new AffinityTopologyVersion(1, 0),
-            cctx.config().getBackups());
-
-        List<List<ClusterNode>> calcAff2 = func.assignPartitions(ctx);
-
-        Set<Integer> movedParts = new HashSet<>();
-
-        UUID localId = ign.cluster().localNode().id();
-
-        for (int i = 0; i < calcAff.size(); i++) {
-            if (calcAff.get(i).get(0).id().equals(localId) && !calcAff2.get(i).get(0).id().equals(localId))
-                movedParts.add(i);
-        }
-
-        List<Integer> keys = new ArrayList<>();
-
-        for (int i = 0; i < 10000; i++) {
-            int keyPart = func.partition(ign.affinity(cacheName).affinityKey(i));
-
-            if (movedParts.contains(keyPart))
-                keys.add(i);
-
-            if (keys.size() == size)
-                break;
-        }
-
-        return keys;
-    }
-
-    /**
      *
      */
     public static class SetValueEntryProcessor implements CacheEntryProcessor<Integer,
Integer, Object> {
@@ -908,80 +833,4 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest
{
             return null;
         }
     }
-
-    /**
-     *
-     */
-    public static class FakeNode implements ClusterNode {
-        /** */
-        private final String consistendId;
-        /** */
-        private final UUID uuid;
-
-        /** */
-        public FakeNode(String consistendId) {
-            this.consistendId = consistendId;
-            uuid = UUID.randomUUID();
-        }
-
-        /** {@inheritDoc} */
-        @Override public UUID id() {
-            return uuid;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object consistentId() {
-            return consistendId;
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public <T> T attribute(String name) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public ClusterMetrics metrics() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Map<String, Object> attributes() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<String> addresses() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<String> hostNames() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public long order() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteProductVersion version() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isLocal() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isDaemon() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isClient() {
-            return false;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
index d331387..cefb774 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
@@ -103,6 +103,13 @@ public class GridTestNode extends GridMetadataAwareAdapter implements
ClusterNod
         return id;
     }
 
+    /**
+     * @param consistentId Consistent ID.
+     */
+    public void consistentId(Object consistentId) {
+        this.consistentId = consistentId;
+    }
+
     /** {@inheritDoc} */
     @Override public Object consistentId() {
         return consistentId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index c76c83e..e6b30e0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -46,10 +46,10 @@ import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
@@ -64,7 +64,9 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheExplicitLockSpan;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -91,6 +93,7 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.GridAbstractTest;
 import org.apache.ignite.transactions.Transaction;
@@ -1117,6 +1120,77 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest
{
     }
 
     /**
+     * Return list of keys that are primary for given node on current topology,
+     * but primary node will change after new node will be added.
+     *
+     * @param ign Ignite.
+     * @param cacheName Cache name.
+     * @param size Number of keys.
+     * @return List of keys.
+     */
+    protected final List<Integer> movingKeysAfterJoin(Ignite ign, String cacheName,
int size) {
+        assertEquals("Expected consistentId is set to node name", ign.name(), ign.cluster().localNode().consistentId());
+
+        GridCacheContext<Object, Object> cctx = ((IgniteKernal)ign).context().cache().internalCache(cacheName).context();
+
+        ArrayList<ClusterNode> nodes = new ArrayList<>(ign.cluster().nodes());
+
+        AffinityFunction func = cctx.config().getAffinity();
+
+        AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(
+            nodes,
+            null,
+            null,
+            AffinityTopologyVersion.NONE,
+            cctx.config().getBackups());
+
+        List<List<ClusterNode>> calcAff = func.assignPartitions(ctx);
+
+        GridTestNode fakeNode = new GridTestNode(UUID.randomUUID(), null);
+
+        fakeNode.consistentId(getTestIgniteInstanceName(nodes.size()));
+
+        nodes.add(fakeNode);
+
+        ctx = new GridAffinityFunctionContextImpl(
+            nodes,
+            null,
+            null,
+            AffinityTopologyVersion.NONE,
+            cctx.config().getBackups());
+
+        List<List<ClusterNode>> calcAff2 = func.assignPartitions(ctx);
+
+        Set<Integer> movedParts = new HashSet<>();
+
+        UUID locId = ign.cluster().localNode().id();
+
+        for (int i = 0; i < calcAff.size(); i++) {
+            if (calcAff.get(i).get(0).id().equals(locId) && !calcAff2.get(i).get(0).id().equals(locId))
+                movedParts.add(i);
+        }
+
+        List<Integer> keys = new ArrayList<>();
+
+        Affinity<Integer> aff = ign.affinity(cacheName);
+
+        for (int i = 0; i < 10_000; i++) {
+            int keyPart = aff.partition(i);
+
+            if (movedParts.contains(keyPart)) {
+                keys.add(i);
+
+                if (keys.size() == size)
+                    break;
+            }
+        }
+
+        assertEquals("Failed to find moving keys [movedPats=" + movedParts + ", keys=" +
keys + ']', size, keys.size());
+
+        return keys;
+    }
+
+    /**
      * @param cache Cache.
      * @return Collection of keys for which given cache is primary.
      * @throws IgniteCheckedException If failed.


Mime
View raw message