Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0B769200CC4 for ; Thu, 13 Jul 2017 12:21:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 09F8C16BD47; Thu, 13 Jul 2017 10:21:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8104816BD45 for ; Thu, 13 Jul 2017 12:21:42 +0200 (CEST) Received: (qmail 72562 invoked by uid 500); 13 Jul 2017 10:21:41 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 72539 invoked by uid 99); 13 Jul 2017 10:21:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Jul 2017 10:21:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 49A68E968F; Thu, 13 Jul 2017 10:21:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Thu, 13 Jul 2017 10:21:40 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] ignite git commit: ignite-5578 Affinity for local join archived-at: Thu, 13 Jul 2017 10:21:44 -0000 Repository: ignite Updated Branches: refs/heads/ignite-5578-locJoin 84d10aeb7 -> b5319a02a ignite-5578 Affinity for local join Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4bd1ee8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4bd1ee8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4bd1ee8 Branch: refs/heads/ignite-5578-locJoin Commit: d4bd1ee8e9ec74ab8c4ddbe48509e526f4223632 Parents: 83c779b Author: sboikov Authored: Thu Jul 13 11:51:43 2017 +0300 Committer: sboikov Committed: Thu Jul 13 13:20:24 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 8 + .../GridCachePartitionExchangeManager.java | 32 +- .../dht/GridClientPartitionTopology.java | 2 +- .../dht/GridDhtPartitionTopology.java | 3 +- .../dht/GridDhtPartitionTopologyImpl.java | 5 +- .../GridDhtPartitionsExchangeFuture.java | 2 +- .../ignite/internal/util/GridListSet.java | 8 + .../CacheLateAffinityAssignmentTest.java | 39 +- .../distributed/CachePartitionStateTest.java | 354 ++++++++++++++++++- .../TestCacheNodeExcludingFilter.java | 53 +++ 10 files changed, 450 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 879e6a9a..8a293ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -517,6 +517,14 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap } } + for (DynamicCacheDescriptor desc : startDescs) { + CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId()); + + assert grp != null; + + grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topVer), true); + } + cctx.cache().initCacheProxies(topVer, null); cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null); http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 51214e3..06f336e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; @@ -1438,19 +1439,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana return; try { - List futs = exchangeFutures(); - - GridDhtPartitionsExchangeFuture fut = null; - - for (int i = futs.size() - 1; i >= 0; i++) { - GridDhtPartitionsExchangeFuture fut0 = futs.get(i); - - if (fut0.exchangeId().equals(msg.exchangeId())) { - fut = fut0; - - break; - } - } + GridDhtPartitionsExchangeFuture fut = exchFuts.find(msg.exchangeId()); if (fut != null) fut.processSinglePartitionRequest(node, msg); @@ -2266,6 +2255,23 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana return super.values(); } + /** + * @param exchangeId Exchange ID. + * @return Future. + */ + public synchronized GridDhtPartitionsExchangeFuture find(GridDhtPartitionExchangeId exchangeId) { + ListIterator it = listIterator(size() - 1); + + while (it.hasPrevious()) { + GridDhtPartitionsExchangeFuture fut0 = it.previous(); + + if (fut0.exchangeId().equals(exchangeId)) + return fut0; + } + + return null; + } + /** {@inheritDoc} */ @Override public synchronized String toString() { return S.toString(ExchangeFutureSet.class, this, super.toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index c1a465d..232ce38 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -835,7 +835,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void onExchangeDone(AffinityAssignment assignment) { + @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index caf3936..d9e04a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -340,6 +340,7 @@ public interface GridDhtPartitionTopology { * Callback on exchange done. * * @param assignment New affinity assignment. + * @param updateRebalanceVer {@code True} if need check rebalance state. */ - public void onExchangeDone(AffinityAssignment assignment); + public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 9d16d90..5ef499c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1509,12 +1509,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void onExchangeDone(AffinityAssignment assignment) { + @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) { lock.writeLock().lock(); try { if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0) rebuildDiff(assignment); + + if (updateRebalanceVer) + updateRebalanceVersion(assignment.assignment()); } finally { lock.writeLock().unlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index a8d1589..513f950 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1347,7 +1347,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (err == null) { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) - grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion())); + grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()), false); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java index 6226bd2..1a632b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridListSet.java @@ -373,6 +373,14 @@ public class GridListSet extends GridSerializableSet implements Cloneable return vals.iterator(); } + /** + * @param idx Start index. + * @return List iterator. + */ + public ListIterator listIterator(int idx) { + return vals.listIterator(idx); + } + /** {@inheritDoc} */ @Override public int size() { return vals.size(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java index 840dda1..a1a01e1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java @@ -100,7 +100,6 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; /** * @@ -332,7 +331,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { } }; - cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0))); + cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0))); testAffinitySimpleSequentialStart(); @@ -352,7 +351,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { } }; - cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(1))); + cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1))); startServer(0, 1); @@ -392,7 +391,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { } }; - cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2))); + cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2))); startServer(0, 1); startServer(1, 2); @@ -440,7 +439,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { } }; - cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0))); + cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0))); Ignite ignite0 = startServer(0, 1); @@ -468,7 +467,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { } }; - cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0))); + cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0))); Ignite ignite0 = startServer(0, 1); @@ -521,7 +520,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { */ private void cacheDestroyAndCreate(boolean cacheOnCrd) throws Exception { if (!cacheOnCrd) - cacheNodeFilter = new CacheNodeFilter(Collections.singletonList(getTestIgniteInstanceName(0))); + cacheNodeFilter = new TestCacheNodeExcludingFilter(Collections.singletonList(getTestIgniteInstanceName(0))); startServer(0, 1); @@ -1904,7 +1903,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { startServer(0, 1); cacheC = null; - cacheNodeFilter = new CacheNodeFilter(Collections.singletonList(getTestIgniteInstanceName(0))); + cacheNodeFilter = new TestCacheNodeExcludingFilter(Collections.singletonList(getTestIgniteInstanceName(0))); startServer(1, 2); @@ -2093,7 +2092,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { exclude.add("server-" + (srvIdx + rnd.nextInt(10))); } - ccfg.setNodeFilter(new CacheNodeFilter(exclude)); + ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(exclude)); } ccfg.setName(name); @@ -2669,28 +2668,6 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { /** * */ - static class CacheNodeFilter implements IgnitePredicate { - /** */ - private Collection excludeNodes; - - /** - * @param excludeNodes Nodes names. - */ - public CacheNodeFilter(Collection excludeNodes) { - this.excludeNodes = excludeNodes; - } - - /** {@inheritDoc} */ - @Override public boolean apply(ClusterNode clusterNode) { - String name = clusterNode.attribute(ATTR_IGNITE_INSTANCE_NAME).toString(); - - return !excludeNodes.contains(name); - } - } - - /** - * - */ static class TestTcpDiscoverySpi extends TcpDiscoverySpi { /** */ private boolean blockCustomEvt; http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java index c4f3f4a..c64ed0b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java @@ -17,10 +17,24 @@ package org.apache.ignite.internal.processors.cache.distributed; +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -28,6 +42,11 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; + /** * */ @@ -35,22 +54,321 @@ public class CachePartitionStateTest extends GridCommonAbstractTest { /** */ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + /** */ + private boolean client; + + /** */ + private CacheConfiguration ccfg; + /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration() throws Exception { - IgniteConfiguration cfg = super.getConfiguration(); + protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + cfg.setClientMode(client); + + if (ccfg != null) { + cfg.setCacheConfiguration(ccfg); + + ccfg = null; + } + return cfg; } + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + /** * @throws Exception If failed. */ - public void testPartitionState1() throws Exception { + public void testPartitionState1_1() throws Exception { + partitionState1(0, true); + } + /** + * @throws Exception If failed. + */ + public void testPartitionState1_2() throws Exception { + partitionState1(1, true); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionState1_2_NoCacheOnCoordinator() throws Exception { + partitionState1(1, false); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionState1_3() throws Exception { + partitionState1(100, true); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionState2_1() throws Exception { + partitionState2(0, true); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionState2_2() throws Exception { + partitionState2(1, true); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionState2_2_NoCacheOnCoordinator() throws Exception { + partitionState2(1, false); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionState2_3() throws Exception { + partitionState2(100, true); + } + + /** + * @param backups Number of backups. + * @param crdAffNode If {@code false} cache is not created on coordinator. + * @throws Exception If failed. + */ + private void partitionState1(int backups, boolean crdAffNode) throws Exception { + startGrids(3); + + blockSupplySend(DEFAULT_CACHE_NAME); + + CacheConfiguration ccfg = cacheConfiguration(DEFAULT_CACHE_NAME, backups); + + if (!crdAffNode) + ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0))); + + ignite(1).createCache(ccfg); + + AffinityAssignment assign0 = + grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment( + new AffinityTopologyVersion(3, 1)); + + awaitPartitionMapExchange(); + + checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + + client = true; + + Ignite clientNode = startGrid(4); + + checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING); + + clientNode.cache(DEFAULT_CACHE_NAME); + + checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + + client = false; + + startGrid(5); + + checkRebalance(DEFAULT_CACHE_NAME, false); + + for (int i = 0; i < 3; i++) + checkNodePartitions(assign0, ignite(i).cluster().localNode(), DEFAULT_CACHE_NAME, OWNING); + + AffinityAssignment assign1 = + grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment( + new AffinityTopologyVersion(5, 0)); + + checkNodePartitions(assign1, ignite(5).cluster().localNode(), DEFAULT_CACHE_NAME, MOVING); + + stopBlock(); + + awaitPartitionMapExchange(); + + AffinityAssignment assign2 = + grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment( + new AffinityTopologyVersion(5, 1)); + + checkPartitionsState(assign2, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + + if (!crdAffNode) + ignite(0).cache(DEFAULT_CACHE_NAME); + + checkPartitionsState(assign2, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + + startGrid(6); + + awaitPartitionMapExchange(); + + AffinityAssignment assign3 = + grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment( + new AffinityTopologyVersion(6, 1)); + + checkPartitionsState(assign3, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + } + + /** + * @param backups Number of backups. + * @param crdAffNode If {@code false} cache is not created on coordinator. + * @throws Exception If failed. + */ + private void partitionState2(int backups, boolean crdAffNode) throws Exception { + startGrids(3); + + blockSupplySend(DEFAULT_CACHE_NAME); + + ccfg = cacheConfiguration(DEFAULT_CACHE_NAME, backups); + + if (!crdAffNode) + ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0))); + + startGrid(4); + + AffinityAssignment assign0 = + grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment( + new AffinityTopologyVersion(4, 0)); + + checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + + if (!crdAffNode) + ignite(0).cache(DEFAULT_CACHE_NAME); + + checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + + stopBlock(); + + startGrid(5); + + AffinityAssignment assign1 = + grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment( + new AffinityTopologyVersion(5, 1)); + + awaitPartitionMapExchange(); + + checkPartitionsState(assign1, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + } + + /** + * @param assign Assignments. + * @param cacheName Cache name. + * @param expState Expected state. + */ + private void checkPartitionsState(AffinityAssignment assign, String cacheName, GridDhtPartitionState expState) { + for (Ignite node : G.allGrids()) + checkNodePartitions(assign, node.cluster().localNode(), cacheName, expState); + } + + /** + * @param assign Assignments. + * @param clusterNode Node. + * @param cacheName Cache name. + * @param expState Expected partitions state. + */ + private void checkNodePartitions(AffinityAssignment assign, + ClusterNode clusterNode, + String cacheName, + GridDhtPartitionState expState) + { + Affinity aff = ignite(0).affinity(cacheName); + + Set nodeParts = new HashSet<>(); + + nodeParts.addAll(assign.primaryPartitions(clusterNode.id())); + nodeParts.addAll(assign.backupPartitions(clusterNode.id())); + + log.info("Test state [node=" + clusterNode.id() + ", parts=" + nodeParts.size() + ", state=" + expState + ']'); + + if (grid(0).context().discovery().cacheAffinityNode(clusterNode, cacheName)) + assertFalse(nodeParts.isEmpty()); + + boolean check = false; + + for (Ignite node : G.allGrids()) { + GridCacheAdapter cache = + ((IgniteKernal)node).context().cache().internalCache(cacheName); + + if (cache != null) { + check = true; + + GridDhtPartitionTopology top = cache.context().topology(); + + GridDhtPartitionMap partsMap = top.partitions(clusterNode.id()); + + for (int p = 0; p < aff.partitions(); p++) { + if (nodeParts.contains(p)) { + assertNotNull(partsMap); + assertEquals(expState, partsMap.get(p)); + } + else { + if (partsMap != null) { + GridDhtPartitionState state = partsMap.get(p); + + assertTrue("Unexpected state: " + state, state == null || state == EVICTED); + } + } + } + } + else { + assertEquals(0, aff.primaryPartitions(((IgniteKernal)node).localNode()).length); + assertEquals(0, aff.backupPartitions(((IgniteKernal)node).localNode()).length); + } + } + + assertTrue(check); + } + + /** + * @param cacheName Cache name. + * @param expDone Expected rebalance finish flag. + */ + private void checkRebalance(String cacheName, boolean expDone) { + for (Ignite node : G.allGrids()) { + IgniteKernal node0 = (IgniteKernal)node; + + GridCacheAdapter cache = node0.context().cache().internalCache(cacheName); + + AffinityTopologyVersion topVer = node0.context().cache().context().exchange().readyAffinityVersion(); + + if (cache != null) + assertEquals(expDone, cache.context().topology().rebalanceFinished(topVer)); + else + node0.context().discovery().cacheAffinityNode(node0.localNode(), cacheName); + } + } + + /** + * @param cacheName Cache name. + */ + private void blockSupplySend(String cacheName) { + for (Ignite node : G.allGrids()) + blockSupplySend(TestRecordingCommunicationSpi.spi(node), cacheName); } /** @@ -58,15 +376,35 @@ public class CachePartitionStateTest extends GridCommonAbstractTest { * @param cacheName Cache name. */ private void blockSupplySend(TestRecordingCommunicationSpi spi, final String cacheName) { - final int grpId = groupIdForCache(spi.ignite(), cacheName); + final int grpId = CU.cacheId(cacheName); spi.blockMessages(new IgniteBiPredicate() { @Override public boolean apply(ClusterNode node, Message msg) { - if (!msg.getClass().equals(GridDhtPartitionSupplyMessage.class)) - return false; - - return ((GridDhtPartitionSupplyMessage)msg).groupId() == grpId; + return msg.getClass().equals(GridDhtPartitionSupplyMessage.class) && + ((GridDhtPartitionSupplyMessage)msg).groupId() == grpId; } }); } + + /** + * + */ + private void stopBlock() { + for (Ignite node : G.allGrids()) + TestRecordingCommunicationSpi.spi(node).stopBlock(); + } + + /** + * @param name Cache name. + * @param backups Backups number. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(String name, int backups) { + CacheConfiguration ccfg = new CacheConfiguration(name); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setBackups(backups); + + return ccfg; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d4bd1ee8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java new file mode 100644 index 0000000..a3f7d27 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.Arrays; +import java.util.Collection; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.lang.IgnitePredicate; + +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; + +/** + * + */ +public class TestCacheNodeExcludingFilter implements IgnitePredicate { + /** */ + private Collection excludeNodes; + + /** + * @param excludeNodes Nodes names. + */ + public TestCacheNodeExcludingFilter(Collection excludeNodes) { + this.excludeNodes = excludeNodes; + } + /** + * @param excludeNodes Nodes names. + */ + public TestCacheNodeExcludingFilter(String... excludeNodes) { + this.excludeNodes = Arrays.asList(excludeNodes); + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode clusterNode) { + String name = clusterNode.attribute(ATTR_IGNITE_INSTANCE_NAME).toString(); + + return !excludeNodes.contains(name); + } +}