Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D9AC1200CFE for ; Fri, 28 Jul 2017 14:07:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D843316CAC4; Fri, 28 Jul 2017 12:07:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2FA3E16CA1A for ; Fri, 28 Jul 2017 14:07:16 +0200 (CEST) Received: (qmail 33649 invoked by uid 500); 28 Jul 2017 12:07:15 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 33536 invoked by uid 99); 28 Jul 2017 12:07:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Jul 2017 12:07:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1CB35F3308; Fri, 28 Jul 2017 12:07:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yzhdanov@apache.org To: commits@ignite.apache.org Date: Fri, 28 Jul 2017 12:07:33 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [21/50] [abbrv] ignite git commit: Test for cache partitions state, fix for client cache start. archived-at: Fri, 28 Jul 2017 12:07:20 -0000 Test for cache partitions state, fix for client cache start. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aeb9336b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aeb9336b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aeb9336b Branch: refs/heads/ignite-5658 Commit: aeb9336b3b161ddfff73f17e41cd453409b84a16 Parents: ca496f6 Author: sboikov Authored: Mon Jul 24 11:47:16 2017 +0300 Committer: sboikov Committed: Mon Jul 24 11:47:16 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 51 ++- .../dht/GridClientPartitionTopology.java | 7 +- .../dht/GridDhtPartitionTopology.java | 12 +- .../dht/GridDhtPartitionTopologyImpl.java | 45 +- .../GridDhtPartitionsExchangeFuture.java | 120 +++--- .../GridCacheDatabaseSharedManager.java | 6 +- .../CacheLateAffinityAssignmentTest.java | 36 +- .../distributed/CachePartitionStateTest.java | 410 +++++++++++++++++++ .../TestCacheNodeExcludingFilter.java | 53 +++ .../db/IgnitePdsCacheRestoreTest.java | 208 ++++++++++ .../testsuites/IgniteCacheTestSuite6.java | 38 ++ .../ignite/testsuites/IgnitePdsTestSuite.java | 3 + 12 files changed, 863 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 79ab183..f519b4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -517,6 +517,16 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap } } + for (DynamicCacheDescriptor desc : startDescs) { + if (desc.cacheConfiguration().getCacheMode() != LOCAL) { + CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId()); + + assert grp != null; + + grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topVer), true); + } + } + cctx.cache().initCacheProxies(topVer, null); cctx.cache().completeClientCacheChangeFuture(msg.requestId(), null); @@ -1299,6 +1309,19 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap } /** + * @param grpId Group ID. + * @return Group name for debug purpose. + */ + private String debugGroupName(int grpId) { + CacheGroupDescriptor desc = caches.group(grpId); + + if (desc != null) + return desc.cacheOrGroupName(); + else + return "Unknown group: " + grpId; + } + + /** * @param fut Exchange future. * @throws IgniteCheckedException If failed. */ @@ -1396,19 +1419,31 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap * Called on exchange initiated by server node leave. * * @param fut Exchange future. + * @param crd Coordinator flag. * @throws IgniteCheckedException If failed. * @return {@code True} if affinity should be assigned by coordinator. */ - public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { + public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException { ClusterNode leftNode = fut.discoveryEvent().eventNode(); assert !leftNode.isClient() : leftNode; - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal()) - continue; + if (crd) { + // Need initialize CacheGroupHolders if this node become coordinator on this exchange. + forAllRegisteredCacheGroups(new IgniteInClosureX() { + @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { + CacheGroupHolder cache = groupHolder(fut.topologyVersion(), desc); - grp.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + cache.aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + } + }); + } + else { + forAllCacheGroups(false, new IgniteInClosureX() { + @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { + aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); + } + }); } synchronized (mux) { @@ -1433,12 +1468,8 @@ public class CacheAffinitySharedManager extends GridCacheSharedManagerAdap @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { CacheGroupHolder grpHolder = grpHolders.get(desc.groupId()); - if (grpHolder != null) { - if (grpHolder.client()) // Affinity for non-client holders calculated in {@link #onServerLeft}. - grpHolder.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); - + if (grpHolder != null) return; - } // Need initialize holders and affinity if this node became coordinator during this exchange. final Integer grpId = desc.groupId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index f4ed517..232ce38 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -367,6 +367,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public GridDhtLocalPartition localPartition(int p) { return localPartition(p, AffinityTopologyVersion.NONE, false); } @@ -830,7 +835,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void onExchangeDone(AffinityAssignment assignment) { + @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 5f76d12..d9e04a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -130,6 +130,15 @@ public interface GridDhtPartitionTopology { throws GridDhtInvalidPartitionException; /** + * Unconditionally creates partition during restore of persisted partition state. + * + * @param p Partition ID. + * @return Partition. + * @throws IgniteCheckedException If failed. + */ + public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException; + + /** * @param topVer Topology version at the time of creation. * @param p Partition ID. * @param create If {@code true}, then partition will be created if it's not there. @@ -331,6 +340,7 @@ public interface GridDhtPartitionTopology { * Callback on exchange done. * * @param assignment New affinity assignment. + * @param updateRebalanceVer {@code True} if need check rebalance state. */ - public void onExchangeDone(AffinityAssignment assignment); + public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer); } http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 601da1b..5ef499c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -135,9 +135,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** */ private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE; - /** */ - private volatile boolean treatAllPartAsLoc; - /** * @param ctx Cache shared context. * @param grp Cache group. @@ -421,14 +418,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean affReady) throws IgniteCheckedException { - DiscoveryEvent discoEvt = exchFut.discoveryEvent(); - - treatAllPartAsLoc = exchFut.activateCluster() - || (discoEvt.type() == EventType.EVT_NODE_JOINED - && discoEvt.eventNode().isLocal() - && !ctx.kernalContext().clientNode() - ); - ClusterNode loc = ctx.localNode(); ctx.database().checkpointReadLock(); @@ -540,8 +529,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException { - treatAllPartAsLoc = false; - boolean changed = false; int num = grp.affinity().partitions(); @@ -692,6 +679,29 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { return loc; } + /** {@inheritDoc} */ + @Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException { + lock.writeLock().lock(); + + try { + GridDhtLocalPartition part = locParts.get(p); + + if (part != null) + return part; + + part = new GridDhtLocalPartition(ctx, grp, p); + + locParts.set(p, part); + + ctx.pageStore().onPartitionCreated(grp.groupId(), p); + + return part; + } + finally { + lock.writeLock().unlock(); + } + } + /** * @param p Partition number. * @param topVer Topology version. @@ -731,7 +741,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (loc != null && state == EVICTED) { locParts.set(p, loc = null); - if (!treatAllPartAsLoc && !belongs) + if (!belongs) throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " + "(often may be caused by inconsistent 'key.hashCode()' implementation) " + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); @@ -741,7 +751,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { "[part=" + p + ", shouldBeMoving=" + loc.reload() + "]"); if (loc == null) { - if (!treatAllPartAsLoc && !belongs) + if (!belongs) throw new GridDhtInvalidPartitionException(p, "Creating partition which does not belong to " + "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " + "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']'); @@ -1499,12 +1509,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ - @Override public void onExchangeDone(AffinityAssignment assignment) { + @Override public void onExchangeDone(AffinityAssignment assignment, boolean updateRebalanceVer) { lock.writeLock().lock(); try { if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0) rebuildDiff(assignment); + + if (updateRebalanceVer) + updateRebalanceVersion(assignment.assignment()); } finally { lock.writeLock().unlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index c4a4f83..cdb4bb7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -193,9 +193,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ private CacheAffinityChangeMessage affChangeMsg; - /** */ - private boolean clientOnlyExchange; - /** Init timestamp. Used to track the amount of time spent to complete the future. */ private long initTs; @@ -485,26 +482,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.affinity().initStartedCaches(crdNode, this, receivedCaches); } - else { - cctx.activate(); - - List> caches = - cctx.cache().cachesToStartOnLocalJoin(); - - if (cctx.database().persistenceEnabled() && - !cctx.kernalContext().clientNode()) { - List startDescs = new ArrayList<>(); - - if (caches != null) { - for (T2 c : caches) - startDescs.add(c.get1()); - } - - cctx.database().readCheckpointAndRestoreMemory(startDescs); - } - - cctx.cache().startCachesOnLocalJoin(caches, topVer); - } + else + initCachesOnLocalJoin(); } exchange = CU.clientNode(discoEvt.eventNode()) ? @@ -571,6 +550,29 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @throws IgniteCheckedException If failed. */ + private void initCachesOnLocalJoin() throws IgniteCheckedException { + cctx.activate(); + + List> caches = + cctx.cache().cachesToStartOnLocalJoin(); + + if (cctx.database().persistenceEnabled() && !cctx.kernalContext().clientNode()) { + List startDescs = new ArrayList<>(); + + if (caches != null) { + for (T2 c : caches) + startDescs.add(c.get1()); + } + + cctx.database().readCheckpointAndRestoreMemory(startDescs); + } + + cctx.cache().startCachesOnLocalJoin(caches, topologyVersion()); + } + + /** + * @throws IgniteCheckedException If failed. + */ private void initTopologies() throws IgniteCheckedException { cctx.database().checkpointReadLock(); @@ -776,7 +778,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte warnNoAffinityNodes(); - centralizedAff = cctx.affinity().onServerLeft(this); + centralizedAff = cctx.affinity().onServerLeft(this, crd); } else cctx.affinity().onServerJoin(this, crd); @@ -788,40 +790,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @throws IgniteCheckedException If failed. */ private void clientOnlyExchange() throws IgniteCheckedException { - clientOnlyExchange = true; - if (crd != null) { - if (crd.isLocal()) { - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - boolean updateTop = !grp.isLocal() && - exchId.topologyVersion().equals(grp.localStartVersion()); - - if (updateTop) { - for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { - if (top.groupId() == grp.groupId()) { - GridDhtPartitionFullMap fullMap = top.partitionMap(true); - - assert fullMap != null; - - grp.topology().update(topologyVersion(), - fullMap, - top.updateCounters(false), - Collections.emptySet()); + assert !crd.isLocal() : crd; - break; - } - } - } - } - } - else { - if (!centralizedAff) - sendLocalPartitions(crd); + if (!centralizedAff) + sendLocalPartitions(crd); - initDone(); + initDone(); - return; - } + return; } else { if (centralizedAff) { // Last server node failed. @@ -896,8 +873,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte try { long start = U.currentTimeMillis(); - IgniteInternalFuture fut = cctx.snapshot() - .tryStartLocalSnapshotOperation(discoEvt); + IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(discoEvt); if (fut != null) { fut.get(); @@ -1122,6 +1098,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException { assert node != null; + GridDhtPartitionsSingleMessage msg; + // Reset lost partition before send local partition to coordinator. if (exchActions != null) { Set caches = exchActions.cachesToResetLostPartitions(); @@ -1130,22 +1108,32 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte resetLostPartitions(caches); } - GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage( - node, exchangeId(), clientOnlyExchange, true); + if (cctx.kernalContext().clientNode()) { + msg = new GridDhtPartitionsSingleMessage(exchangeId(), + true, + null, + true); + } + else { + msg = cctx.exchange().createPartitionsSingleMessage(node, + exchangeId(), + false, + true); + } Map> partHistReserved0 = partHistReserved; if (partHistReserved0 != null) - m.partitionHistoryCounters(partHistReserved0); + msg.partitionHistoryCounters(partHistReserved0); if (stateChangeExchange() && changeGlobalStateE != null) - m.setError(changeGlobalStateE); + msg.setError(changeGlobalStateE); if (log.isDebugEnabled()) - log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']'); + log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']'); try { - cctx.io().send(node, m, SYSTEM_POOL); + cctx.io().send(node, msg, SYSTEM_POOL); } catch (ClusterTopologyCheckedException ignored) { if (log.isDebugEnabled()) @@ -1318,7 +1306,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (err == null) { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) - grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion())); + grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion()), false); } } @@ -1386,10 +1374,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte public void onReceive(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) { assert msg != null; assert msg.exchangeId().equals(exchId) : msg; - assert msg.lastVersion() != null : msg; - if (!msg.client()) + if (!msg.client()) { + assert msg.lastVersion() != null : msg; + updateLastVersion(msg.lastVersion()); + } if (isDone()) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 39038ba..1797d64 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -1560,8 +1560,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan T2 fromWal = partStates.get(new T2<>(grpId, i)); - GridDhtLocalPartition part = grp.topology() - .localPartition(i, AffinityTopologyVersion.NONE, true); + GridDhtLocalPartition part = grp.topology().forceCreatePartition(i); assert part != null; @@ -1621,8 +1620,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @param dataEntry Data entry to apply. */ private void applyUpdate(GridCacheContext cacheCtx, DataEntry dataEntry) throws IgniteCheckedException { - GridDhtLocalPartition locPart = cacheCtx.topology() - .localPartition(dataEntry.partitionId(), AffinityTopologyVersion.NONE, true); + GridDhtLocalPartition locPart = cacheCtx.topology().forceCreatePartition(dataEntry.partitionId()); switch (dataEntry.op()) { case CREATE: http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java index 23043d1..7d8620a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java @@ -331,7 +331,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { } }; - cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0))); + cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0))); testAffinitySimpleSequentialStart(); @@ -351,7 +351,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { } }; - cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(1))); + cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1))); startServer(0, 1); @@ -391,7 +391,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { } }; - cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2))); + cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(1), getTestIgniteInstanceName(2))); startServer(0, 1); startServer(1, 2); @@ -439,7 +439,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { } }; - cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0))); + cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0))); Ignite ignite0 = startServer(0, 1); @@ -467,7 +467,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { } }; - cacheNodeFilter = new CacheNodeFilter(F.asList(getTestIgniteInstanceName(0))); + cacheNodeFilter = new TestCacheNodeExcludingFilter(F.asList(getTestIgniteInstanceName(0))); Ignite ignite0 = startServer(0, 1); @@ -520,7 +520,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { */ private void cacheDestroyAndCreate(boolean cacheOnCrd) throws Exception { if (!cacheOnCrd) - cacheNodeFilter = new CacheNodeFilter(Collections.singletonList(getTestIgniteInstanceName(0))); + cacheNodeFilter = new TestCacheNodeExcludingFilter(Collections.singletonList(getTestIgniteInstanceName(0))); startServer(0, 1); @@ -2069,7 +2069,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { exclude.add("server-" + (srvIdx + rnd.nextInt(10))); } - ccfg.setNodeFilter(new CacheNodeFilter(exclude)); + ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(exclude)); } ccfg.setName(name); @@ -2645,28 +2645,6 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { /** * */ - static class CacheNodeFilter implements IgnitePredicate { - /** */ - private Collection excludeNodes; - - /** - * @param excludeNodes Nodes names. - */ - public CacheNodeFilter(Collection excludeNodes) { - this.excludeNodes = excludeNodes; - } - - /** {@inheritDoc} */ - @Override public boolean apply(ClusterNode clusterNode) { - String name = clusterNode.attribute(ATTR_IGNITE_INSTANCE_NAME).toString(); - - return !excludeNodes.contains(name); - } - } - - /** - * - */ static class TestTcpDiscoverySpi extends TcpDiscoverySpi { /** */ private boolean blockCustomEvt; http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java new file mode 100644 index 0000000..c64ed0b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionStateTest.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.affinity.AffinityAssignment; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; + +/** + * + */ +public class CachePartitionStateTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** */ + private CacheConfiguration ccfg; + + /** {@inheritDoc} */ + protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + cfg.setClientMode(client); + + if (ccfg != null) { + cfg.setCacheConfiguration(ccfg); + + ccfg = null; + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionState1_1() throws Exception { + partitionState1(0, true); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionState1_2() throws Exception { + partitionState1(1, true); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionState1_2_NoCacheOnCoordinator() throws Exception { + partitionState1(1, false); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionState1_3() throws Exception { + partitionState1(100, true); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionState2_1() throws Exception { + partitionState2(0, true); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionState2_2() throws Exception { + partitionState2(1, true); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionState2_2_NoCacheOnCoordinator() throws Exception { + partitionState2(1, false); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionState2_3() throws Exception { + partitionState2(100, true); + } + + /** + * @param backups Number of backups. + * @param crdAffNode If {@code false} cache is not created on coordinator. + * @throws Exception If failed. + */ + private void partitionState1(int backups, boolean crdAffNode) throws Exception { + startGrids(3); + + blockSupplySend(DEFAULT_CACHE_NAME); + + CacheConfiguration ccfg = cacheConfiguration(DEFAULT_CACHE_NAME, backups); + + if (!crdAffNode) + ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0))); + + ignite(1).createCache(ccfg); + + AffinityAssignment assign0 = + grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment( + new AffinityTopologyVersion(3, 1)); + + awaitPartitionMapExchange(); + + checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + + client = true; + + Ignite clientNode = startGrid(4); + + checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING); + + clientNode.cache(DEFAULT_CACHE_NAME); + + checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + + client = false; + + startGrid(5); + + checkRebalance(DEFAULT_CACHE_NAME, false); + + for (int i = 0; i < 3; i++) + checkNodePartitions(assign0, ignite(i).cluster().localNode(), DEFAULT_CACHE_NAME, OWNING); + + AffinityAssignment assign1 = + grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment( + new AffinityTopologyVersion(5, 0)); + + checkNodePartitions(assign1, ignite(5).cluster().localNode(), DEFAULT_CACHE_NAME, MOVING); + + stopBlock(); + + awaitPartitionMapExchange(); + + AffinityAssignment assign2 = + grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment( + new AffinityTopologyVersion(5, 1)); + + checkPartitionsState(assign2, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + + if (!crdAffNode) + ignite(0).cache(DEFAULT_CACHE_NAME); + + checkPartitionsState(assign2, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + + startGrid(6); + + awaitPartitionMapExchange(); + + AffinityAssignment assign3 = + grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment( + new AffinityTopologyVersion(6, 1)); + + checkPartitionsState(assign3, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + } + + /** + * @param backups Number of backups. + * @param crdAffNode If {@code false} cache is not created on coordinator. + * @throws Exception If failed. + */ + private void partitionState2(int backups, boolean crdAffNode) throws Exception { + startGrids(3); + + blockSupplySend(DEFAULT_CACHE_NAME); + + ccfg = cacheConfiguration(DEFAULT_CACHE_NAME, backups); + + if (!crdAffNode) + ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(getTestIgniteInstanceName(0))); + + startGrid(4); + + AffinityAssignment assign0 = + grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment( + new AffinityTopologyVersion(4, 0)); + + checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + + if (!crdAffNode) + ignite(0).cache(DEFAULT_CACHE_NAME); + + checkPartitionsState(assign0, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + + stopBlock(); + + startGrid(5); + + AffinityAssignment assign1 = + grid(1).context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinity().assignment( + new AffinityTopologyVersion(5, 1)); + + awaitPartitionMapExchange(); + + checkPartitionsState(assign1, DEFAULT_CACHE_NAME, OWNING); + + checkRebalance(DEFAULT_CACHE_NAME, true); + } + + /** + * @param assign Assignments. + * @param cacheName Cache name. + * @param expState Expected state. + */ + private void checkPartitionsState(AffinityAssignment assign, String cacheName, GridDhtPartitionState expState) { + for (Ignite node : G.allGrids()) + checkNodePartitions(assign, node.cluster().localNode(), cacheName, expState); + } + + /** + * @param assign Assignments. + * @param clusterNode Node. + * @param cacheName Cache name. + * @param expState Expected partitions state. + */ + private void checkNodePartitions(AffinityAssignment assign, + ClusterNode clusterNode, + String cacheName, + GridDhtPartitionState expState) + { + Affinity aff = ignite(0).affinity(cacheName); + + Set nodeParts = new HashSet<>(); + + nodeParts.addAll(assign.primaryPartitions(clusterNode.id())); + nodeParts.addAll(assign.backupPartitions(clusterNode.id())); + + log.info("Test state [node=" + clusterNode.id() + ", parts=" + nodeParts.size() + ", state=" + expState + ']'); + + if (grid(0).context().discovery().cacheAffinityNode(clusterNode, cacheName)) + assertFalse(nodeParts.isEmpty()); + + boolean check = false; + + for (Ignite node : G.allGrids()) { + GridCacheAdapter cache = + ((IgniteKernal)node).context().cache().internalCache(cacheName); + + if (cache != null) { + check = true; + + GridDhtPartitionTopology top = cache.context().topology(); + + GridDhtPartitionMap partsMap = top.partitions(clusterNode.id()); + + for (int p = 0; p < aff.partitions(); p++) { + if (nodeParts.contains(p)) { + assertNotNull(partsMap); + assertEquals(expState, partsMap.get(p)); + } + else { + if (partsMap != null) { + GridDhtPartitionState state = partsMap.get(p); + + assertTrue("Unexpected state: " + state, state == null || state == EVICTED); + } + } + } + } + else { + assertEquals(0, aff.primaryPartitions(((IgniteKernal)node).localNode()).length); + assertEquals(0, aff.backupPartitions(((IgniteKernal)node).localNode()).length); + } + } + + assertTrue(check); + } + + /** + * @param cacheName Cache name. + * @param expDone Expected rebalance finish flag. + */ + private void checkRebalance(String cacheName, boolean expDone) { + for (Ignite node : G.allGrids()) { + IgniteKernal node0 = (IgniteKernal)node; + + GridCacheAdapter cache = node0.context().cache().internalCache(cacheName); + + AffinityTopologyVersion topVer = node0.context().cache().context().exchange().readyAffinityVersion(); + + if (cache != null) + assertEquals(expDone, cache.context().topology().rebalanceFinished(topVer)); + else + node0.context().discovery().cacheAffinityNode(node0.localNode(), cacheName); + } + } + + /** + * @param cacheName Cache name. + */ + private void blockSupplySend(String cacheName) { + for (Ignite node : G.allGrids()) + blockSupplySend(TestRecordingCommunicationSpi.spi(node), cacheName); + } + + /** + * @param spi SPI. + * @param cacheName Cache name. + */ + private void blockSupplySend(TestRecordingCommunicationSpi spi, final String cacheName) { + final int grpId = CU.cacheId(cacheName); + + spi.blockMessages(new IgniteBiPredicate() { + @Override public boolean apply(ClusterNode node, Message msg) { + return msg.getClass().equals(GridDhtPartitionSupplyMessage.class) && + ((GridDhtPartitionSupplyMessage)msg).groupId() == grpId; + } + }); + } + + /** + * + */ + private void stopBlock() { + for (Ignite node : G.allGrids()) + TestRecordingCommunicationSpi.spi(node).stopBlock(); + } + + /** + * @param name Cache name. + * @param backups Backups number. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(String name, int backups) { + CacheConfiguration ccfg = new CacheConfiguration(name); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setBackups(backups); + + return ccfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java new file mode 100644 index 0000000..a3f7d27 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/TestCacheNodeExcludingFilter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.Arrays; +import java.util.Collection; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.lang.IgnitePredicate; + +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; + +/** + * + */ +public class TestCacheNodeExcludingFilter implements IgnitePredicate { + /** */ + private Collection excludeNodes; + + /** + * @param excludeNodes Nodes names. + */ + public TestCacheNodeExcludingFilter(Collection excludeNodes) { + this.excludeNodes = excludeNodes; + } + /** + * @param excludeNodes Nodes names. + */ + public TestCacheNodeExcludingFilter(String... excludeNodes) { + this.excludeNodes = Arrays.asList(excludeNodes); + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode clusterNode) { + String name = clusterNode.attribute(ATTR_IGNITE_INSTANCE_NAME).toString(); + + return !excludeNodes.contains(name); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java new file mode 100644 index 0000000..25626f4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheRestoreTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db; + +import java.util.Arrays; +import java.util.List; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgnitePdsCacheRestoreTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private CacheConfiguration[] ccfgs; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + if (ccfgs != null) { + cfg.setCacheConfiguration(ccfgs); + + ccfgs = null; + } + + MemoryConfiguration memCfg = new MemoryConfiguration(); + memCfg.setPageSize(1024); + memCfg.setDefaultMemoryPolicySize(10 * 1024 * 1024); + + cfg.setMemoryConfiguration(memCfg); + + PersistentStoreConfiguration pCfg = new PersistentStoreConfiguration(); + + pCfg.setWalMode(WALMode.LOG_ONLY); + + cfg.setPersistentStoreConfiguration(pCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + GridTestUtils.deleteDbFiles(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + GridTestUtils.deleteDbFiles(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testRestoreAndNewCache1() throws Exception { + restoreAndNewCache(false); + } + + /** + * @throws Exception If failed. + */ + public void testRestoreAndNewCache2() throws Exception { + restoreAndNewCache(true); + } + + /** + * @param createNew If {@code true} need cache is added while node is stopped. + * @throws Exception If failed. + */ + private void restoreAndNewCache(boolean createNew) throws Exception { + for (int i = 0; i < 3; i++) { + ccfgs = configurations1(); + + startGrid(i); + } + + ignite(0).active(true); + + IgniteCache cache1 = ignite(2).cache("c1"); + + List keys = primaryKeys(cache1, 10); + + for (Integer key : keys) + cache1.put(key, key); + + stopGrid(2); + + if (createNew) { + // New cache is added when node is stopped. + ignite(0).getOrCreateCaches(Arrays.asList(configurations2())); + } + else { + // New cache is added on node restart. + ccfgs = configurations2(); + } + + startGrid(2); + + cache1 = ignite(2).cache("c1"); + + IgniteCache cache2 = ignite(2).cache("c2"); + + for (Integer key : keys) { + assertEquals(key, cache1.get(key)); + + assertNull(cache2.get(key)); + + cache2.put(key, key); + + assertEquals(key, cache2.get(key)); + } + + List nearKeys = nearKeys(cache1, 10, 0); + + for (Integer key : nearKeys) { + assertNull(cache1.get(key)); + assertNull(cache2.get(key)); + + cache2.put(key, key); + assertEquals(key, cache2.get(key)); + + cache1.put(key, key); + assertEquals(key, cache1.get(key)); + } + + startGrid(3); + + awaitPartitionMapExchange(); + + for (Integer key : nearKeys) { + assertEquals(key, cache2.get(key)); + + assertEquals(key, cache1.get(key)); + } + } + + /** + * @return Configurations set 1. + */ + private CacheConfiguration[] configurations1() { + CacheConfiguration[] ccfgs = new CacheConfiguration[1]; + + ccfgs[0] = cacheConfiguration("c1"); + + return ccfgs; + } + + /** + * @return Configurations set 1. + */ + private CacheConfiguration[] configurations2() { + CacheConfiguration[] ccfgs = new CacheConfiguration[2]; + + ccfgs[0] = cacheConfiguration("c1"); + ccfgs[1] = cacheConfiguration("c2"); + + return ccfgs; + } + + /** + * @param name Cache name. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(String name) { + CacheConfiguration ccfg = new CacheConfiguration(name); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + return ccfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java new file mode 100644 index 0000000..bb32d24 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest; + +/** + * Test suite. + */ +public class IgniteCacheTestSuite6 extends TestSuite { + /** + * @return IgniteCache test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("IgniteCache Test Suite part 6"); + + suite.addTestSuite(CachePartitionStateTest.class); + + return suite; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/aeb9336b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java index 5b562c3..5762c02 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactiva import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsClientNearCachePutGetTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsDynamicCacheTest; import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsSingleNodePutGetPersistenceTest; +import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsCacheRestoreTest; import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCheckpointSimulationWithRealCpDisabledTest; import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsEvictionTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.BPlusTreePageMemoryImplTest; @@ -74,6 +75,8 @@ public class IgnitePdsTestSuite extends TestSuite { suite.addTestSuite(IgniteClusterActivateDeactivateTestWithPersistence.class); + suite.addTestSuite(IgnitePdsCacheRestoreTest.class); + return suite; } }