ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5578
Date Fri, 28 Jul 2017 13:28:20 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 3488f9262 -> 8c80ef7b8


ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8c80ef7b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8c80ef7b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8c80ef7b

Branch: refs/heads/ignite-5578
Commit: 8c80ef7b811b00f95f3198e1cc34563eaf9e2a62
Parents: 3488f92
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Jul 28 14:39:50 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Jul 28 16:20:40 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  26 +-
 .../processors/cache/ExchangeContext.java       |  27 ++-
 .../processors/cache/GridCacheIoManager.java    |  39 +--
 .../dht/GridClientPartitionTopology.java        |   2 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   5 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   2 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   2 +-
 .../GridDhtPartitionsExchangeFuture.java        |   4 -
 .../cache/transactions/IgniteTxHandler.java     |  13 +-
 .../CacheLateAffinityAssignmentTest.java        | 243 +++++++++++--------
 10 files changed, 212 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 471f2ef..66924d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -448,20 +448,20 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                             grp.topology().updateTopologyVersion(topFut, discoCache, -1,
false);
 
+                            grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity());
+
+                            grpHolders.put(grp.groupId(), grpHolder);
+
                             GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(grp.groupId());
 
                             if (clientTop != null) {
-                                grp.topology().update(topVer,
+                                grp.topology().update(grpHolder.affinity().lastVersion(),
                                     clientTop.partitionMap(true),
                                     clientTop.updateCounters(false),
                                     Collections.<Integer>emptySet(),
                                     null);
                             }
 
-                            grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity());
-
-                            grpHolders.put(grp.groupId(), grpHolder);
-
                             assert grpHolder.affinity().lastVersion().equals(grp.affinity().lastVersion());
                         }
                     }
@@ -1662,6 +1662,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                             aff.initialize(topVer, assign);
                         }
+
+                        grpHolder.topology(fut).beforeExchange(fut, true);
                     }
                     else {
                         List<GridDhtPartitionsExchangeFuture> exchFuts = cctx.exchange().exchangeFutures();
@@ -1673,14 +1675,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                         final GridDhtPartitionsExchangeFuture prev = exchFuts.get(idx + 1);
 
+                        assert prev.isDone() && prev.topologyVersion().compareTo(topVer)
< 0 : prev;
+
                         if (log.isDebugEnabled()) {
                             log.debug("Need initialize affinity on coordinator [" +
                                 "cacheGrp=" + desc.cacheOrGroupName() +
                                 "prevAff=" + prev.topologyVersion() + ']');
                         }
 
-                        assert prev.topologyVersion().compareTo(topVer) < 0 : prev;
-
                         GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
                             desc.groupId(),
                             prev.topologyVersion(),
@@ -1701,7 +1703,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                                 aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
 
-                                affFut.onDone(fut.topologyVersion());
+                                affFut.onDone(topVer);
                             }
                         });
 
@@ -1714,13 +1716,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                     if (newAff) {
                         GridAffinityAssignmentCache aff = grpHolder.affinity();
 
-                        if (!aff.lastVersionEquals(fut.topologyVersion())) {
-                            List<List<ClusterNode>> assign = aff.calculate(fut.topologyVersion(),
+                        if (!aff.lastVersionEquals(topVer)) {
+                            List<List<ClusterNode>> assign = aff.calculate(topVer,
                                 fut.discoveryEvent(),
                                 fut.discoCache());
 
-                            aff.initialize(fut.topologyVersion(), assign);
+                            aff.initialize(topVer, assign);
                         }
+
+                        grpHolder.topology(fut).beforeExchange(fut, true);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index 70c896e..1a5fbb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
@@ -32,6 +33,9 @@ import static org.apache.ignite.internal.processors.cache.GridCachePartitionExch
  */
 public class ExchangeContext {
     /** */
+    public static final String IGNITE_EXCHANGE_COMPATIBILITY_MODE = "IGNITE_EXCHANGE_COMPATIBILITY_MODE";
+
+    /** */
     private Set<Integer> requestGrpsAffOnJoin;
 
     /** */
@@ -43,22 +47,35 @@ public class ExchangeContext {
     /** */
     private final ExchangeDiscoveryEvents evts;
 
+    /** */
+    private final boolean compatibilityNode = IgniteSystemProperties.getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_MODE,
false);
+
     /**
      * @param fut Exchange future.
      */
     public ExchangeContext(GridDhtPartitionsExchangeFuture fut) {
-        int protocolVer = exchangeProtocolVersion(
-            fut.discoCache().minimumNodeVersion());
+        int protocolVer = exchangeProtocolVersion(fut.discoCache().minimumNodeVersion());
 
-        fetchAffOnJoin = protocolVer == 1;
+        if (compatibilityNode) {
+            fetchAffOnJoin = true;
 
-        merge = protocolVer > 1 && fut.discoveryEvent().type() != EVT_DISCOVERY_CUSTOM_EVT;
+            merge = false;
+        }
+        else {
+            fetchAffOnJoin = protocolVer == 1;
+
+            merge = protocolVer > 1 && fut.discoveryEvent().type() != EVT_DISCOVERY_CUSTOM_EVT;
+        }
 
         evts = new ExchangeDiscoveryEvents(fut);
     }
 
+    /**
+     * @param node Node.
+     * @return {@code True} if node supports exchange merge protocol.
+     */
     boolean supportsMergeExchanges(ClusterNode node) {
-        return exchangeProtocolVersion(node.version()) > 1;
+        return !compatibilityNode && exchangeProtocolVersion(node.version()) >
1;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 981c6e2..91872e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -1051,27 +1051,34 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter
{
                 throw e;
         }
         finally {
-            // Reset thread local context.
-            cctx.tm().resetContext();
+            onMessageProcessed(msg);
+        }
+    }
 
-            GridCacheMvccManager mvcc = cctx.mvcc();
+    /**
+     * @param msg Message.
+     */
+    public void onMessageProcessed(GridCacheMessage msg) {
+        // Reset thread local context.
+        cctx.tm().resetContext();
 
-            if (mvcc != null)
-                mvcc.contextReset();
+        GridCacheMvccManager mvcc = cctx.mvcc();
 
-            // Unwind eviction notifications.
-            if (msg instanceof IgniteTxStateAware) {
-                IgniteTxState txState = ((IgniteTxStateAware)msg).txState();
+        if (mvcc != null)
+            mvcc.contextReset();
 
-                if (txState != null)
-                    txState.unwindEvicts(cctx);
-            }
-            else if (msg instanceof GridCacheIdMessage) {
-                GridCacheContext ctx = cctx.cacheContext(((GridCacheIdMessage)msg).cacheId());
+        // Unwind eviction notifications.
+        if (msg instanceof IgniteTxStateAware) {
+            IgniteTxState txState = ((IgniteTxStateAware)msg).txState();
 
-                if (ctx != null)
-                    CU.unwindEvicts(ctx);
-            }
+            if (txState != null)
+                txState.unwindEvicts(cctx);
+        }
+        else if (msg instanceof GridCacheIdMessage) {
+            GridCacheContext ctx = cctx.cacheContext(((GridCacheIdMessage)msg).cacheId());
+
+            if (ctx != null)
+                CU.unwindEvicts(ctx);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 975385f..37f4fd8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -793,7 +793,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
             GridDhtPartitionMap cur = node2part.get(parts.nodeId());
 
             if (force) {
-                if (cur != null)
+                if (cur != null && cur.topologyVersion().initialized())
                     parts.updateSequence(cur.updateSequence(), cur.topologyVersion());
             }
             else if (isStaleUpdate(cur, parts)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 531aa51..aa2f0f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1498,7 +1498,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
             GridDhtPartitionMap cur = node2part.get(parts.nodeId());
 
             if (force) {
-                if (cur != null)
+                if (cur != null && cur.topologyVersion().initialized())
                     parts.updateSequence(cur.updateSequence(), cur.topologyVersion());
             }
             else  if (isStaleUpdate(cur, parts)) {
@@ -1600,7 +1600,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
         lock.writeLock().lock();
 
         try {
-            assert assignment.topologyVersion().equals(((GridDhtPartitionsExchangeFuture)topReadyFut).context().events().topologyVersion());
+            assert !(topReadyFut instanceof GridDhtPartitionsExchangeFuture) ||
+                assignment.topologyVersion().equals(((GridDhtPartitionsExchangeFuture)topReadyFut).context().events().topologyVersion());
 
             readyTopVer = lastTopChangeVer = assignment.topologyVersion();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index daedda8..d39afb1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -846,7 +846,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends
GridDhtCach
 
                     top = topology();
 
-                    topology().readLock();
+                    top.readLock();
                 }
 
                 try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 3e43e2b..3f97cae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1902,7 +1902,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
fut) {
                             ctx.closures().runLocalWithThreadPolicy(thread, new Runnable()
{
                                 @Override public void run() {
-                                    updateAllAsyncInternal0(node, req, completionCb);
+                                    updateAllAsyncInternal(node, req, completionCb);
                                 }
                             });
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 1cd72e9..d5b7846 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2178,9 +2178,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
                         continue;
 
-                    if (resTopVer.equals(new AffinityTopologyVersion(3, 0)) && "c1".equals(grp.cacheOrGroupName()))
-                        System.out.println();
-
                     grp.topology().beforeExchange(this, true);
                 }
 
@@ -2974,7 +2971,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                                 assert newCrdFut != null;
 
-
                                 newCrdFut.init(GridDhtPartitionsExchangeFuture.this);
 
                                 newCrdFut.listen(new CI1<IgniteInternalFuture>() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 753f8d8..6fc5e4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -25,7 +25,6 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -376,8 +375,11 @@ public class IgniteTxHandler {
 
                 GridDhtTopologyFuture topFut = top.topologyVersionFuture();
 
-                if (!topFut.isDone())
+                if (!topFut.isDone()) {
+                    top.readUnlock();
+
                     return null;
+                }
             }
 
             try {
@@ -549,7 +551,12 @@ public class IgniteTxHandler {
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
fut) {
                             ctx.kernalContext().closure().runLocalWithThreadPolicy(thread,
new Runnable() {
                                 @Override public void run() {
-                                    processNearTxPrepareRequest0(node, req);
+                                    try {
+                                        processNearTxPrepareRequest0(node, req);
+                                    }
+                                    finally {
+                                        ctx.io().onMessageProcessed(req);
+                                    }
                                 }
                             });
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8c80ef7b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 2350ed0..4bb7554 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -100,6 +100,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.ExchangeContext.IGNITE_EXCHANGE_COMPATIBILITY_MODE;
 
 /**
  *
@@ -381,52 +382,59 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     public void testAffinitySimpleNoCacheOnCoordinator2() throws Exception {
-        cacheC = new IgniteClosure<String, CacheConfiguration[]>() {
-            @Override public CacheConfiguration[] apply(String igniteInstanceName) {
-                if (igniteInstanceName.equals(getTestIgniteInstanceName(1)) ||
-                    igniteInstanceName.equals(getTestIgniteInstanceName(2)))
-                    return null;
+        System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE, "true");
 
-                return new CacheConfiguration[]{cacheConfiguration()};
-            }
-        };
+        try {
+            cacheC = new IgniteClosure<String, CacheConfiguration[]>() {
+                @Override public CacheConfiguration[] apply(String igniteInstanceName) {
+                    if (igniteInstanceName.equals(getTestIgniteInstanceName(1)) ||
+                        igniteInstanceName.equals(getTestIgniteInstanceName(2)))
+                        return null;
+
+                    return new CacheConfiguration[]{cacheConfiguration()};
+                }
+            };
 
-        cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1),
getTestIgniteInstanceName(2)));
+            cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1),
getTestIgniteInstanceName(2)));
 
-        startServer(0, 1);
-        startServer(1, 2);
-        startServer(2, 3);
-        startServer(3, 4);
+            startServer(0, 1);
+            startServer(1, 2);
+            startServer(2, 3);
+            startServer(3, 4);
 
-        for (int i = 0; i < 4; i++) {
-            TestRecordingCommunicationSpi spi =
-                (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+            for (int i = 0; i < 4; i++) {
+                TestRecordingCommunicationSpi spi =
+                    (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
 
-            // Prevent exchange finish while node0 or node1 is coordinator.
-            spi.blockMessages(GridDhtPartitionsSingleMessage.class, ignite(0).name());
-            spi.blockMessages(GridDhtPartitionsSingleMessage.class, ignite(1).name());
-        }
+                // Prevent exchange finish while node0 or node1 is coordinator.
+                spi.blockMessages(GridDhtPartitionsSingleMessage.class, ignite(0).name());
+                spi.blockMessages(GridDhtPartitionsSingleMessage.class, ignite(1).name());
+            }
 
-        stopGrid(0);
-        stopGrid(1);
+            stopGrid(0);
+            stopGrid(1);
 
-        calculateAffinity(5);
-        calculateAffinity(6);
+            calculateAffinity(5);
+            calculateAffinity(6);
 
-        checkAffinity(2, topVer(6, 0), true);
+            checkAffinity(2, topVer(6, 0), true);
 
-        assertNull(((IgniteKernal)ignite(2)).context().cache().internalCache(CACHE_NAME1));
-        assertNotNull(((IgniteKernal)ignite(3)).context().cache().internalCache(CACHE_NAME1));
+            assertNull(((IgniteKernal)ignite(2)).context().cache().internalCache(CACHE_NAME1));
+            assertNotNull(((IgniteKernal)ignite(3)).context().cache().internalCache(CACHE_NAME1));
 
-        assertNotNull(ignite(2).cache(CACHE_NAME1));
+            assertNotNull(ignite(2).cache(CACHE_NAME1));
 
-        checkAffinity(2, topVer(6, 0), true);
+            checkAffinity(2, topVer(6, 0), true);
 
-        startServer(4, 7);
+            startServer(4, 7);
 
-        checkAffinity(3, topVer(7, 0), false);
+            checkAffinity(3, topVer(7, 0), false);
 
-        checkAffinity(3, topVer(7, 1), true);
+            checkAffinity(3, topVer(7, 1), true);
+        }
+        finally {
+            System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE);
+        }
     }
 
     /**
@@ -624,38 +632,45 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     public void testNodeLeaveExchangeWaitAffinityMessage() throws Exception {
-        Ignite ignite0 = startServer(0, 1);
+        System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE, "true");
 
-        startServer(1, 2);
+        try {
+            Ignite ignite0 = startServer(0, 1);
 
-        startServer(2, 3);
+            startServer(1, 2);
 
-        checkAffinity(3, topVer(3, 1), true);
+            startServer(2, 3);
 
-        checkOrderCounters(3, topVer(3, 1));
+            checkAffinity(3, topVer(3, 1), true);
 
-        startClient(3, 4);
+            checkOrderCounters(3, topVer(3, 1));
 
-        checkAffinity(4, topVer(4, 0), true);
+            startClient(3, 4);
 
-        TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi();
+            checkAffinity(4, topVer(4, 0), true);
 
-        discoSpi.blockCustomEvent();
+            TestTcpDiscoverySpi discoSpi = (TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi();
 
-        stopGrid(1);
+            discoSpi.blockCustomEvent();
 
-        List<IgniteInternalFuture<?>> futs = affFutures(3, topVer(5, 0));
+            stopGrid(1);
 
-        U.sleep(1000);
+            List<IgniteInternalFuture<?>> futs = affFutures(3, topVer(5, 0));
 
-        for (IgniteInternalFuture<?> fut : futs)
-            assertFalse(fut.isDone());
+            U.sleep(1000);
 
-        discoSpi.stopBlock();
+            for (IgniteInternalFuture<?> fut : futs)
+                assertFalse(fut.isDone());
 
-        checkAffinity(3, topVer(5, 0), false);
+            discoSpi.stopBlock();
 
-        checkOrderCounters(3, topVer(5, 0));
+            checkAffinity(3, topVer(5, 0), false);
+
+            checkOrderCounters(3, topVer(5, 0));
+        }
+        finally {
+            System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE);
+        }
     }
 
     /**
@@ -1044,39 +1059,46 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     private void nodeLeftExchangeCoordinatorLeave(int nodes) throws Exception {
-        assert nodes > 2 : nodes;
+        System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE, "true");
 
-        long topVer = 0;
+        try {
+            assert nodes > 2 : nodes;
 
-        for (int i = 0; i < nodes; i++)
-            startServer(i, ++topVer);
+            long topVer = 0;
 
-        Ignite ignite1 = grid(1);
+            for (int i = 0; i < nodes; i++)
+                startServer(i, ++topVer);
 
-        checkAffinity(nodes, topVer(nodes, 1), true);
+            Ignite ignite1 = grid(1);
 
-        TestRecordingCommunicationSpi spi1 =
-            (TestRecordingCommunicationSpi)ignite1.configuration().getCommunicationSpi();
+            checkAffinity(nodes, topVer(nodes, 1), true);
 
-        // Prevent exchange finish while node0 is coordinator.
-        spi1.blockMessages(GridDhtPartitionsSingleMessage.class, ignite(0).name());
+            TestRecordingCommunicationSpi spi1 =
+                (TestRecordingCommunicationSpi)ignite1.configuration().getCommunicationSpi();
 
-        stopNode(2, ++topVer); // New exchange started.
+            // Prevent exchange finish while node0 is coordinator.
+            spi1.blockMessages(GridDhtPartitionsSingleMessage.class, ignite(0).name());
 
-        stopGrid(0); // Stop coordinator while exchange in progress.
+            stopNode(2, ++topVer); // New exchange started.
 
-        Map<String, List<List<ClusterNode>>> aff = checkAffinity(nodes
- 2, topVer(topVer, 0), false);
+            stopGrid(0); // Stop coordinator while exchange in progress.
 
-        topVer++;
+            Map<String, List<List<ClusterNode>>> aff = checkAffinity(nodes
- 2, topVer(topVer, 0), false);
 
-        boolean primaryChanged = calculateAffinity(nodes + 2, false, aff);
+            topVer++;
 
-        checkAffinity(nodes - 2, topVer(topVer, 0), !primaryChanged);
+            boolean primaryChanged = calculateAffinity(nodes + 2, false, aff);
 
-        if (primaryChanged)
-            checkAffinity(nodes - 2, topVer(topVer, 1), true);
+            checkAffinity(nodes - 2, topVer(topVer, 0), !primaryChanged);
 
-        awaitPartitionMapExchange();
+            if (primaryChanged)
+                checkAffinity(nodes - 2, topVer(topVer, 1), true);
+
+            awaitPartitionMapExchange();
+        }
+        finally {
+            System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE);
+        }
     }
 
     /**
@@ -1184,69 +1206,76 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     public void testDelayAssignmentAffinityChanged2() throws Exception {
-        Ignite ignite0 = startServer(0, 1);
+        System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE, "true");
 
-        TestTcpDiscoverySpi discoSpi0 =
-            (TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi();
-        TestRecordingCommunicationSpi commSpi0 =
-            (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi();
+        try {
+            Ignite ignite0 = startServer(0, 1);
 
-        startClient(1, 2);
+            TestTcpDiscoverySpi discoSpi0 =
+                (TestTcpDiscoverySpi)ignite0.configuration().getDiscoverySpi();
+            TestRecordingCommunicationSpi commSpi0 =
+                (TestRecordingCommunicationSpi)ignite0.configuration().getCommunicationSpi();
 
-        checkAffinity(2, topVer(2, 0), true);
+            startClient(1, 2);
 
-        startServer(2, 3);
+            checkAffinity(2, topVer(2, 0), true);
 
-        checkAffinity(3, topVer(3, 1), false);
+            startServer(2, 3);
 
-        discoSpi0.blockCustomEvent();
+            checkAffinity(3, topVer(3, 1), false);
 
-        stopNode(2, 4);
+            discoSpi0.blockCustomEvent();
 
-        discoSpi0.waitCustomEvent();
+            stopNode(2, 4);
 
-        blockSupplySend(commSpi0, CACHE_NAME1);
+            discoSpi0.waitCustomEvent();
 
-        final IgniteInternalFuture<?> startedFuture = multithreadedAsync(new Callable<Void>()
{
-            @Override public Void call() throws Exception {
-                startServer(3, 5);
+            blockSupplySend(commSpi0, CACHE_NAME1);
 
-                return null;
-            }
-        }, 1, "server-starter");
+            final IgniteInternalFuture<?> startedFuture = multithreadedAsync(new Callable<Void>()
{
+                @Override public Void call() throws Exception {
+                    startServer(3, 5);
 
-        Thread.sleep(2_000);
+                    return null;
+                }
+            }, 1, "server-starter");
 
-        discoSpi0.stopBlock();
+            Thread.sleep(2_000);
 
-        boolean started = GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                return startedFuture.isDone();
-            }
-        }, 10_000);
+            discoSpi0.stopBlock();
 
-        if (!started)
-            startedFuture.cancel();
+            boolean started = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return startedFuture.isDone();
+                }
+            }, 10_000);
 
-        assertTrue(started);
+            if (!started)
+                startedFuture.cancel();
 
-        checkAffinity(3, topVer(5, 0), false);
+            assertTrue(started);
 
-        checkNoExchange(3, topVer(5, 1));
+            checkAffinity(3, topVer(5, 0), false);
 
-        commSpi0.stopBlock();
+            checkNoExchange(3, topVer(5, 1));
 
-        checkAffinity(3, topVer(5, 1), true);
+            commSpi0.stopBlock();
 
-        long nodeJoinTopVer = grid(3).context().discovery().localJoinEvent().topologyVersion();
+            checkAffinity(3, topVer(5, 1), true);
 
-        assertEquals(5, nodeJoinTopVer);
+            long nodeJoinTopVer = grid(3).context().discovery().localJoinEvent().topologyVersion();
 
-        List<GridDhtPartitionsExchangeFuture> exFutures = grid(3).context().cache().context().exchange().exchangeFutures();
+            assertEquals(5, nodeJoinTopVer);
 
-        for (GridDhtPartitionsExchangeFuture f : exFutures) {
-            //Shouldn't contains staled futures.
-            assertTrue(f.initialVersion().topologyVersion() >= nodeJoinTopVer);
+            List<GridDhtPartitionsExchangeFuture> exFutures = grid(3).context().cache().context().exchange().exchangeFutures();
+
+            for (GridDhtPartitionsExchangeFuture f : exFutures) {
+                //Shouldn't contains staled futures.
+                assertTrue(f.initialVersion().topologyVersion() >= nodeJoinTopVer);
+            }
+        }
+        finally {
+            System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_MODE);
         }
     }
 


Mime
View raw message