ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5578
Date Fri, 04 Aug 2017 12:27:18 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 25520bf97 -> 745395785


ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/74539578
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/74539578
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/74539578

Branch: refs/heads/ignite-5578
Commit: 7453957857b0089b39798e38df841fb4a25ca4b1
Parents: 25520bf
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Aug 4 12:22:30 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Aug 4 13:37:28 2017 +0300

----------------------------------------------------------------------
 .../affinity/GridAffinityAssignmentCache.java   |  7 ++
 .../distributed/dht/GridDhtTopologyFuture.java  |  2 +
 .../GridDhtPartitionsExchangeFuture.java        |  6 +-
 .../distributed/CacheExchangeMergeTest.java     | 86 +++++++++++++++++---
 4 files changed, 88 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/74539578/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 69d6a40..f921251 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.affinity;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -635,6 +636,12 @@ public class GridAffinityAssignmentCache {
         }
     }
 
+    /**
+     * @return All initialized versions.
+     */
+    public Collection<AffinityTopologyVersion> cachedVersions() {
+        return affCache.keySet();
+    }
 
     /**
      * Affinity ready future. Will remove itself from ready futures map.

http://git-wip-us.apache.org/repos/asf/ignite/blob/74539578/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
index cc12960..0bcc4a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java
@@ -62,6 +62,8 @@ public interface GridDhtTopologyFuture extends IgniteInternalFuture<AffinityTopo
      * is completed before {@link GridFutureAdapter#onDone(Object, Throwable)} is called
on
      * {@link GridDhtPartitionsExchangeFuture}, it is guaranteed that this method will return
{@code true}
      * if affinity ready future is finished.
+     * <p>
+     * Also this method returns {@code false} for merged exchange futures.
      *
      * @return {@code True} if exchange is finished and result topology version can be used.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/74539578/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 67e41b3..30571ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -359,7 +359,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
     /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
-        assert exchangeDone();
+        /*
+        Should not be called before exchange is finished since result version can change
in
+        case of merged exchanges.
+         */
+        assert exchangeDone() : "Should not be called before exchange is finished";
 
         return exchCtx.events().topologyVersion();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/74539578/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 3149385..fcb5276 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.TestDelayingCommunicationSpi;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
@@ -80,6 +81,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 
 /**
  *
@@ -199,7 +201,6 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
     }
 
     // TODO IGNITE-5578 joined merged node failed (client/server).
-    // TODO IGNITE-5578 check exchanges/affinity consistency.
 
     /**
      * @throws Exception If failed.
@@ -486,6 +487,10 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
         fut2.get();
 
         checkCaches();
+
+        checkExchanges(srv0, 1, 3);
+        checkExchanges(ignite(1), 3);
+        checkExchanges(ignite(2), 3);
     }
 
     /**
@@ -531,7 +536,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         cfgCache = true;
 
-        IgniteInternalFuture fut = startGrids(srv0, 1, nodes);
+        IgniteInternalFuture fut = startGridsAsync(srv0, 1, nodes);
 
         fut.get();
 
@@ -556,7 +561,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
             for (int i = 0; i < 3; i++) {
                 mergeExchangeWaitVersion(srv0, topVer + 3);
 
-                startGrids(srv0, topVer, 3).get();
+                startGridsAsync(srv0, topVer, 3).get();
 
                 topVer += 3;
             }
@@ -605,7 +610,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testStartCacheOnJoinAndFailMerge() throws Exception {
+    public void testStartCacheOnJoinAndMergeWithFail() throws Exception {
         cfgCache = false;
 
         final Ignite srv0 = startGrids(2);
@@ -614,13 +619,17 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         cfgCache = true;
 
-        IgniteInternalFuture fut = startGrids(srv0, 2, 2);
+        IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2);
 
         stopGrid(1);
 
         fut.get();
 
         checkCaches();
+
+        checkExchanges(srv0, 1, 2, 3, 5);
+        checkExchanges(ignite(2), 3, 5);
+        checkExchanges(ignite(3), 5);
     }
 
     /**
@@ -635,7 +644,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         cfgCache = true;
 
-        IgniteInternalFuture fut = startGrids(srv0, 2, 2);
+        IgniteInternalFuture fut = startGridsAsync(srv0, 2, 2);
 
         stopGrid(0);
 
@@ -656,7 +665,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         cfgCache = true;
 
-        IgniteInternalFuture fut = startGrids(srv0, 1, 2);
+        IgniteInternalFuture fut = startGridsAsync(srv0, 1, 2);
 
         stopGrid(0);
 
@@ -686,6 +695,10 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
         fut.get();
 
         checkCaches();
+
+        checkExchanges(srv0, 1, 3);
+        checkExchanges(ignite(1), 3);
+        checkExchanges(ignite(2), 3);
     }
 
     /**
@@ -717,6 +730,12 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
         fut.get();
 
         checkCaches();
+
+        checkExchanges(srv0, 1, 2, 3, 5);
+        checkExchanges(ignite(1), 2, 3, 5);
+        checkExchanges(ignite(2), 3, 5);
+        checkExchanges(ignite(3), 5);
+        checkExchanges(ignite(4), 5);
     }
 
     /**
@@ -727,7 +746,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         mergeExchangeWaitVersion(srv0, 6);
 
-        IgniteInternalFuture fut = startGrids(srv0, 3, 3);
+        IgniteInternalFuture fut = startGridsAsync(srv0, 3, 3);
 
         fut.get();
 
@@ -812,7 +831,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         mergeExchangeWaitVersion(srv0, 12);
 
-        IgniteInternalFuture fut = startGrids(srv0, 6, 2);
+        IgniteInternalFuture fut = startGridsAsync(srv0, 6, 2);
 
         fut.get();
 
@@ -903,7 +922,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         CountDownLatch latch = blockExchangeFinish(srvs, mode);
 
-        IgniteInternalFuture<?> fut = startGrids(srv0, srvs, 2);
+        IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, 2);
 
         if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
             fail("Failed to wait for expected messages.");
@@ -946,7 +965,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
         CountDownLatch latch = blockExchangeFinish(srv0, srvs + 1, blockNodes, waitMsgNodes);
 
-        IgniteInternalFuture<?> fut = startGrids(srv0, srvs, startNodes);
+        IgniteInternalFuture<?> fut = startGridsAsync(srv0, srvs, startNodes);
 
         if (latch != null && !latch.await(WAIT_SECONDS, TimeUnit.SECONDS))
             fail("Failed to wait for expected messages.");
@@ -1370,6 +1389,47 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest
{
 
     /**
      * @param node Node.
+     * @param vers Expected exchange versions.
+     */
+    private void checkExchanges(Ignite node, long... vers) {
+        IgniteKernal node0 = (IgniteKernal)node;
+
+        List<AffinityTopologyVersion> expVers = new ArrayList<>();
+
+        for (long ver : vers)
+            expVers.add(new AffinityTopologyVersion(ver));
+
+        List<AffinityTopologyVersion> doneVers = new ArrayList<>();
+
+        List<GridDhtPartitionsExchangeFuture> futs =
+            node0.context().cache().context().exchange().exchangeFutures();
+
+        for (int i = futs.size() - 1; i >= 0; i--) {
+            GridDhtPartitionsExchangeFuture fut = futs.get(i);
+
+            if (fut.exchangeDone() && fut.firstEvent().type() != EVT_DISCOVERY_CUSTOM_EVT)
{
+                AffinityTopologyVersion resVer = fut.topologyVersion();
+
+                if (resVer != null)
+                    doneVers.add(resVer);
+            }
+        }
+
+        assertEquals(expVers, doneVers);
+
+        for (CacheGroupContext grpCtx : node0.context().cache().cacheGroups()) {
+            for (AffinityTopologyVersion ver : grpCtx.affinity().cachedVersions()) {
+                if (ver.minorTopologyVersion() > 0)
+                    continue;
+
+                assertTrue("Unexpected version [ver=" + ver + ", exp=" + expVers + ']',
+                    expVers.contains(ver));
+            }
+        }
+    }
+
+    /**
+     * @param node Node.
      * @param topVer Exchange version.
      * @throws Exception If failed.
      */
@@ -1386,13 +1446,15 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * Sequentially starts nodes so that node name is consistent with node order.
+     *
      * @param node Some existing node.
      * @param startIdx Start node index.
      * @param cnt Number of nodes.
      * @return Start future.
      * @throws Exception If failed.
      */
-    private IgniteInternalFuture startGrids(Ignite node, int startIdx, int cnt) throws Exception
{
+    private IgniteInternalFuture startGridsAsync(Ignite node, int startIdx, int cnt) throws
Exception {
         GridCompoundFuture fut = new GridCompoundFuture();
 
         for (int i = 0; i < cnt; i++) {


Mime
View raw message