ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [46/50] [abbrv] ignite git commit: IGNITE-6434 Fixed error in checkpointer during topology change. Fixes #2718
Date Tue, 26 Sep 2017 10:48:59 GMT
IGNITE-6434 Fixed error in checkpointer during topology change. Fixes #2718


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/21de1c56
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/21de1c56
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/21de1c56

Branch: refs/heads/ignite-gg-12822
Commit: 21de1c56268c18685dd3620b7e3dc776ca2cf532
Parents: ae9c6d6
Author: Eduard Shangareev <eduard.shangareev@gmail.com>
Authored: Fri Sep 22 16:17:42 2017 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Fri Sep 22 16:18:36 2017 +0300

----------------------------------------------------------------------
 .../dht/GridDhtPartitionTopologyImpl.java       | 495 ++++++++++---------
 .../GridCacheDatabaseSharedManager.java         |   5 +-
 .../IgnitePdsExchangeDuringCheckpointTest.java  | 135 +++++
 .../IgniteCacheDataStructuresSelfTestSuite.java |   4 +-
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   4 +
 5 files changed, 407 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/21de1c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index cad21d3..5a1e050 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -298,20 +298,27 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
         GridDhtPartitionsExchangeFuture exchFut)
         throws IgniteInterruptedCheckedException
     {
-        U.writeLock(lock);
+        ctx.database().checkpointReadLock();
 
         try {
-            if (stopping)
-                return;
+            U.writeLock(lock);
 
-            long updateSeq = this.updateSeq.incrementAndGet();
+            try {
+                if (stopping)
+                    return;
 
-            initPartitions0(affVer, exchFut, updateSeq);
+                long updateSeq = this.updateSeq.incrementAndGet();
 
-            consistencyCheck();
+                initPartitions0(affVer, exchFut, updateSeq);
+
+                consistencyCheck();
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
         }
         finally {
-            lock.writeLock().unlock();
+            ctx.database().checkpointReadUnlock();
         }
     }
 
@@ -589,101 +596,109 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
             ", affVer=" + grp.affinity().lastVersion() +
             ", fut=" + exchFut + ']';
 
-        lock.writeLock().lock();
+        ctx.database().checkpointReadLock();
 
         try {
-            if (stopping)
-                return false;
 
-            assert readyTopVer.initialized() : readyTopVer;
-            assert lastTopChangeVer.equals(readyTopVer);
+            lock.writeLock().lock();
 
-            if (log.isDebugEnabled())
-                log.debug("Partition map before afterExchange [exchId=" + exchFut.exchangeId()
+ ", fullMap=" +
-                    fullMapString() + ']');
+            try {
+                if (stopping)
+                    return false;
 
-            long updateSeq = this.updateSeq.incrementAndGet();
+                assert readyTopVer.initialized() : readyTopVer;
+                assert lastTopChangeVer.equals(readyTopVer);
 
-            for (int p = 0; p < num; p++) {
-                GridDhtLocalPartition locPart = localPartition0(p, topVer, false, false,
false);
+                if (log.isDebugEnabled())
+                    log.debug("Partition map before afterExchange [exchId=" + exchFut.exchangeId()
+ ", fullMap=" +
+                        fullMapString() + ']');
 
-                if (partitionLocalNode(p, topVer)) {
-                    // This partition will be created during next topology event,
-                    // which obviously has not happened at this point.
-                    if (locPart == null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Skipping local partition afterExchange (will not create):
" + p);
+                long updateSeq = this.updateSeq.incrementAndGet();
 
-                        continue;
-                    }
+                for (int p = 0; p < num; p++) {
+                    GridDhtLocalPartition locPart = localPartition0(p, topVer, false, false,
false);
 
-                    GridDhtPartitionState state = locPart.state();
+                    if (partitionLocalNode(p, topVer)) {
+                        // This partition will be created during next topology event,
+                        // which obviously has not happened at this point.
+                        if (locPart == null) {
+                            if (log.isDebugEnabled())
+                                log.debug("Skipping local partition afterExchange (will not
create): " + p);
 
-                    if (state == MOVING) {
-                        if (grp.rebalanceEnabled()) {
-                            Collection<ClusterNode> owners = owners(p);
+                            continue;
+                        }
 
-                            // If there are no other owners, then become an owner.
-                            if (F.isEmpty(owners)) {
-                                boolean owned = locPart.own();
+                        GridDhtPartitionState state = locPart.state();
 
-                                assert owned : "Failed to own partition [grp=" + grp.cacheOrGroupName()
+ ", locPart=" +
-                                    locPart + ']';
+                        if (state == MOVING) {
+                            if (grp.rebalanceEnabled()) {
+                                Collection<ClusterNode> owners = owners(p);
 
-                                updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
+                                // If there are no other owners, then become an owner.
+                                if (F.isEmpty(owners)) {
+                                    boolean owned = locPart.own();
 
-                                changed = true;
+                                    assert owned : "Failed to own partition [grp=" + grp.cacheOrGroupName()
+ ", locPart=" +
+                                        locPart + ']';
 
-                                if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST))
{
-                                    DiscoveryEvent discoEvt = exchFut.events().lastEvent();
+                                    updateSeq = updateLocal(p, locPart.state(), updateSeq,
topVer);
 
-                                    grp.addRebalanceEvent(p,
-                                        EVT_CACHE_REBALANCE_PART_DATA_LOST,
-                                        discoEvt.eventNode(),
-                                        discoEvt.type(),
-                                        discoEvt.timestamp());
-                                }
+                                    changed = true;
 
-                                if (log.isDebugEnabled())
-                                    log.debug("Owned partition: " + locPart);
+                                    if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST))
{
+                                        DiscoveryEvent discoEvt = exchFut.events().lastEvent();
+
+                                        grp.addRebalanceEvent(p,
+                                            EVT_CACHE_REBALANCE_PART_DATA_LOST,
+                                            discoEvt.eventNode(),
+                                            discoEvt.type(),
+                                            discoEvt.timestamp());
+                                    }
+
+                                    if (log.isDebugEnabled())
+                                        log.debug("Owned partition: " + locPart);
+                                }
+                                else if (log.isDebugEnabled())
+                                    log.debug("Will not own partition (there are owners to
rebalance from) [locPart=" +
+                                        locPart + ", owners = " + owners + ']');
                             }
-                            else if (log.isDebugEnabled())
-                                log.debug("Will not own partition (there are owners to rebalance
from) [locPart=" +
-                                    locPart + ", owners = " + owners + ']');
+                            else
+                                updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
                         }
-                        else
-                            updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
                     }
-                }
-                else {
-                    if (locPart != null) {
-                        GridDhtPartitionState state = locPart.state();
+                    else {
+                        if (locPart != null) {
+                            GridDhtPartitionState state = locPart.state();
 
-                        if (state == MOVING) {
-                            locPart.rent(false);
+                            if (state == MOVING) {
+                                locPart.rent(false);
 
-                            updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
+                                updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
 
-                            changed = true;
+                                changed = true;
 
-                            if (log.isDebugEnabled())
-                                log.debug("Evicting moving partition (it does not belong
to affinity): " + locPart);
+                                if (log.isDebugEnabled())
+                                    log.debug("Evicting moving partition (it does not belong
to affinity): " + locPart);
+                            }
                         }
                     }
                 }
-            }
 
-            AffinityAssignment aff = grp.affinity().readyAffinity(topVer);
+                AffinityAssignment aff = grp.affinity().readyAffinity(topVer);
 
-            if (node2part != null && node2part.valid())
-                changed |= checkEvictions(updateSeq, aff);
+                if (node2part != null && node2part.valid())
+                    changed |= checkEvictions(updateSeq, aff);
 
-            updateRebalanceVersion(aff.assignment());
+                updateRebalanceVersion(aff.assignment());
 
-            consistencyCheck();
+                consistencyCheck();
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
         }
         finally {
-            lock.writeLock().unlock();
+            ctx.database().checkpointReadUnlock();
         }
 
         return changed;
@@ -709,6 +724,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
     private GridDhtLocalPartition createPartition(int p) {
         assert lock.isWriteLockedByCurrentThread();
 
+        assert ctx.database().checkpointLockIsHeldByThread();
+
         GridDhtLocalPartition loc = locParts.get(p);
 
         if (loc == null || loc.state() == EVICTED) {
@@ -1183,232 +1200,239 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
         assert partMap != null;
 
-        lock.writeLock().lock();
+        ctx.database().checkpointReadLock();
 
         try {
-            if (stopping || !lastTopChangeVer.initialized() ||
-                // Ignore message not-related to exchange if exchange is in progress.
-                (exchangeVer == null && !lastTopChangeVer.equals(readyTopVer)))
-                return false;
+            lock.writeLock().lock();
 
-            if (incomeCntrMap != null) {
-                // update local counters in partitions
-                for (int i = 0; i < locParts.length(); i++) {
-                    GridDhtLocalPartition part = locParts.get(i);
+            try {
+                if (stopping || !lastTopChangeVer.initialized() ||
+                    // Ignore message not-related to exchange if exchange is in progress.
+                    (exchangeVer == null && !lastTopChangeVer.equals(readyTopVer)))
+                    return false;
 
-                    if (part == null)
-                        continue;
+                if (incomeCntrMap != null) {
+                    // update local counters in partitions
+                    for (int i = 0; i < locParts.length(); i++) {
+                        GridDhtLocalPartition part = locParts.get(i);
+
+                        if (part == null)
+                            continue;
 
-                    if (part.state() == OWNING || part.state() == MOVING) {
-                        long updCntr = incomeCntrMap.updateCounter(part.id());
+                        if (part.state() == OWNING || part.state() == MOVING) {
+                            long updCntr = incomeCntrMap.updateCounter(part.id());
 
-                        if (updCntr != 0 && updCntr > part.updateCounter())
-                            part.updateCounter(updCntr);
+                            if (updCntr != 0 && updCntr > part.updateCounter())
+                                part.updateCounter(updCntr);
+                        }
                     }
                 }
-            }
 
-            if (exchangeVer != null) {
-                // Ignore if exchange already finished or new exchange started.
-                if (readyTopVer.compareTo(exchangeVer) > 0 || lastTopChangeVer.compareTo(exchangeVer)
> 0) {
-                    U.warn(log, "Stale exchange id for full partition map update (will ignore)
[" +
+                if (exchangeVer != null) {
+                    // Ignore if exchange already finished or new exchange started.
+                    if (readyTopVer.compareTo(exchangeVer) > 0 || lastTopChangeVer.compareTo(exchangeVer)
> 0) {
+                        U.warn(log, "Stale exchange id for full partition map update (will
ignore) [" +
+                            "lastTopChange=" + lastTopChangeVer +
+                            ", readTopVer=" + readyTopVer +
+                            ", exchVer=" + exchangeVer + ']');
+
+                        return false;
+                    }
+                }
+
+                if (msgTopVer != null && lastTopChangeVer.compareTo(msgTopVer) >
0) {
+                    U.warn(log, "Stale version for full partition map update message (will
ignore) [" +
                         "lastTopChange=" + lastTopChangeVer +
                         ", readTopVer=" + readyTopVer +
-                        ", exchVer=" + exchangeVer + ']');
+                        ", msgVer=" + msgTopVer + ']');
 
                     return false;
                 }
-            }
-
-            if (msgTopVer != null && lastTopChangeVer.compareTo(msgTopVer) > 0)
{
-                U.warn(log, "Stale version for full partition map update message (will ignore)
[" +
-                    "lastTopChange=" + lastTopChangeVer +
-                    ", readTopVer=" + readyTopVer +
-                    ", msgVer=" + msgTopVer + ']');
 
-                return false;
-            }
+                boolean fullMapUpdated = (node2part == null);
 
-            boolean fullMapUpdated = (node2part == null);
+                if (node2part != null) {
+                    for (GridDhtPartitionMap part : node2part.values()) {
+                        GridDhtPartitionMap newPart = partMap.get(part.nodeId());
 
-            if (node2part != null) {
-                for (GridDhtPartitionMap part : node2part.values()) {
-                    GridDhtPartitionMap newPart = partMap.get(part.nodeId());
+                        if (shouldOverridePartitionMap(part, newPart)) {
+                            fullMapUpdated = true;
 
-                    if (shouldOverridePartitionMap(part, newPart)) {
-                        fullMapUpdated = true;
+                            if (log.isDebugEnabled()) {
+                                log.debug("Overriding partition map in full update map [exchVer="
+ exchangeVer +
+                                    ", curPart=" + mapString(part) +
+                                    ", newPart=" + mapString(newPart) + ']');
+                            }
 
-                        if (log.isDebugEnabled()) {
-                            log.debug("Overriding partition map in full update map [exchVer="
+ exchangeVer +
-                                ", curPart=" + mapString(part) +
-                                ", newPart=" + mapString(newPart) + ']');
+                            if (newPart.nodeId().equals(ctx.localNodeId()))
+                                updateSeq.setIfGreater(newPart.updateSequence());
+                        }
+                        else {
+                            // If for some nodes current partition has a newer map,
+                            // then we keep the newer value.
+                            partMap.put(part.nodeId(), part);
                         }
-
-                        if (newPart.nodeId().equals(ctx.localNodeId()))
-                            updateSeq.setIfGreater(newPart.updateSequence());
-                    }
-                    else {
-                        // If for some nodes current partition has a newer map,
-                        // then we keep the newer value.
-                        partMap.put(part.nodeId(), part);
                     }
-                }
 
-                // Check that we have new nodes.
-                for (GridDhtPartitionMap part : partMap.values()) {
-                    if (fullMapUpdated)
-                        break;
+                    // Check that we have new nodes.
+                    for (GridDhtPartitionMap part : partMap.values()) {
+                        if (fullMapUpdated)
+                            break;
 
-                    fullMapUpdated = !node2part.containsKey(part.nodeId());
-                }
+                        fullMapUpdated = !node2part.containsKey(part.nodeId());
+                    }
 
-                // Remove entry if node left.
-                for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext();
) {
-                    UUID nodeId = it.next();
+                    // Remove entry if node left.
+                    for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext();
) {
+                        UUID nodeId = it.next();
 
-                    if (!ctx.discovery().alive(nodeId)) {
-                        if (log.isDebugEnabled())
-                            log.debug("Removing left node from full map update [nodeId="
+ nodeId + ", partMap=" +
-                                partMap + ']');
+                        if (!ctx.discovery().alive(nodeId)) {
+                            if (log.isDebugEnabled())
+                                log.debug("Removing left node from full map update [nodeId="
+ nodeId + ", partMap=" +
+                                    partMap + ']');
 
-                        it.remove();
+                            it.remove();
+                        }
                     }
                 }
-            }
-            else {
-                GridDhtPartitionMap locNodeMap = partMap.get(ctx.localNodeId());
-
-                if (locNodeMap != null)
-                    updateSeq.setIfGreater(locNodeMap.updateSequence());
-            }
+                else {
+                    GridDhtPartitionMap locNodeMap = partMap.get(ctx.localNodeId());
 
-            if (!fullMapUpdated) {
-                if (log.isDebugEnabled()) {
-                    log.debug("No updates for full partition map (will ignore) [lastExch="
+ lastTopChangeVer +
-                        ", exchVer=" + exchangeVer +
-                        ", curMap=" + node2part +
-                        ", newMap=" + partMap + ']');
+                    if (locNodeMap != null)
+                        updateSeq.setIfGreater(locNodeMap.updateSequence());
                 }
 
-                return false;
-            }
+                if (!fullMapUpdated) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("No updates for full partition map (will ignore) [lastExch="
+ lastTopChangeVer +
+                            ", exchVer=" + exchangeVer +
+                            ", curMap=" + node2part +
+                            ", newMap=" + partMap + ']');
+                    }
 
-            if (exchangeVer != null) {
-                assert exchangeVer.compareTo(readyTopVer) >= 0 && exchangeVer.compareTo(lastTopChangeVer)
>= 0;
+                    return false;
+                }
 
-                lastTopChangeVer = readyTopVer = exchangeVer;
-            }
+                if (exchangeVer != null) {
+                    assert exchangeVer.compareTo(readyTopVer) >= 0 && exchangeVer.compareTo(lastTopChangeVer)
>= 0;
 
-            node2part = partMap;
+                    lastTopChangeVer = readyTopVer = exchangeVer;
+                }
 
-            if (exchangeVer == null && !grp.isReplicated() &&
-                    (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer)
>= 0)) {
-                AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer);
+                node2part = partMap;
 
-                for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
-                    for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet())
{
-                        int p = e0.getKey();
+                if (exchangeVer == null && !grp.isReplicated() &&
+                        (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer)
>= 0)) {
+                    AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer);
 
-                        Set<UUID> diffIds = diffFromAffinity.get(p);
+                    for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet())
{
+                        for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet())
{
+                            int p = e0.getKey();
 
-                        if ((e0.getValue() == MOVING || e0.getValue() == OWNING || e0.getValue()
== RENTING) &&
-                            !affAssignment.getIds(p).contains(e.getKey())) {
+                            Set<UUID> diffIds = diffFromAffinity.get(p);
 
-                            if (diffIds == null)
-                                diffFromAffinity.put(p, diffIds = U.newHashSet(3));
+                            if ((e0.getValue() == MOVING || e0.getValue() == OWNING || e0.getValue()
== RENTING) &&
+                                !affAssignment.getIds(p).contains(e.getKey())) {
 
-                            diffIds.add(e.getKey());
-                        }
-                        else {
-                            if (diffIds != null && diffIds.remove(e.getKey())) {
-                                if (diffIds.isEmpty())
-                                    diffFromAffinity.remove(p);
+                                if (diffIds == null)
+                                    diffFromAffinity.put(p, diffIds = U.newHashSet(3));
+
+                                diffIds.add(e.getKey());
+                            }
+                            else {
+                                if (diffIds != null && diffIds.remove(e.getKey()))
{
+                                    if (diffIds.isEmpty())
+                                        diffFromAffinity.remove(p);
+                                }
                             }
                         }
                     }
-                }
 
-                diffFromAffinityVer = readyTopVer;
-            }
+                    diffFromAffinityVer = readyTopVer;
+                }
 
-            boolean changed = false;
+                boolean changed = false;
 
-            GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId());
+                GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId());
 
-            if (nodeMap != null && ctx.database().persistenceEnabled() &&
readyTopVer.initialized()) {
-                for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet())
{
-                    int p = e.getKey();
-                    GridDhtPartitionState state = e.getValue();
+                if (nodeMap != null && ctx.database().persistenceEnabled() &&
readyTopVer.initialized()) {
+                    for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet())
{
+                        int p = e.getKey();
+                        GridDhtPartitionState state = e.getValue();
 
-                    if (state == OWNING) {
-                        GridDhtLocalPartition locPart = locParts.get(p);
+                        if (state == OWNING) {
+                            GridDhtLocalPartition locPart = locParts.get(p);
 
-                        assert locPart != null : grp.cacheOrGroupName();
+                            assert locPart != null : grp.cacheOrGroupName();
 
-                        if (locPart.state() == MOVING) {
-                            boolean success = locPart.own();
+                            if (locPart.state() == MOVING) {
+                                boolean success = locPart.own();
 
-                            assert success : locPart;
+                                assert success : locPart;
 
-                            changed |= success;
+                                changed |= success;
+                            }
                         }
-                    }
-                    else if (state == MOVING) {
-                        GridDhtLocalPartition locPart = locParts.get(p);
+                        else if (state == MOVING) {
+                            GridDhtLocalPartition locPart = locParts.get(p);
 
-                        if (locPart == null || locPart.state() == EVICTED)
-                            locPart = createPartition(p);
+                            if (locPart == null || locPart.state() == EVICTED)
+                                locPart = createPartition(p);
 
-                        if (locPart.state() == OWNING) {
-                            locPart.moving();
+                            if (locPart.state() == OWNING) {
+                                locPart.moving();
 
-                            changed = true;
+                                changed = true;
+                            }
                         }
-                    }
-                    else if (state == RENTING && partsToReload.contains(p)) {
-                        GridDhtLocalPartition locPart = locParts.get(p);
+                        else if (state == RENTING && partsToReload.contains(p)) {
+                            GridDhtLocalPartition locPart = locParts.get(p);
 
-                        if (locPart == null || locPart.state() == EVICTED) {
-                            createPartition(p);
+                            if (locPart == null || locPart.state() == EVICTED) {
+                                createPartition(p);
 
-                            changed = true;
-                        }
-                        else if (locPart.state() == OWNING || locPart.state() == MOVING)
{
-                            locPart.reload(true);
+                                changed = true;
+                            }
+                            else if (locPart.state() == OWNING || locPart.state() == MOVING)
{
+                                locPart.reload(true);
 
-                            locPart.rent(false);
+                                locPart.rent(false);
 
-                            changed = true;
+                                changed = true;
+                            }
+                            else
+                                locPart.reload(true);
                         }
-                        else
-                            locPart.reload(true);
                     }
                 }
-            }
 
-            long updateSeq = this.updateSeq.incrementAndGet();
+                long updateSeq = this.updateSeq.incrementAndGet();
 
-            if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer))
{
-                AffinityAssignment  aff = grp.affinity().readyAffinity(readyTopVer);
+                if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer))
{
+                    AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer);
 
-                if (exchangeVer == null)
-                    changed |= checkEvictions(updateSeq, aff);
+                    if (exchangeVer == null)
+                        changed |= checkEvictions(updateSeq, aff);
 
-                updateRebalanceVersion(aff.assignment());
-            }
+                    updateRebalanceVersion(aff.assignment());
+                }
 
-            consistencyCheck();
+                consistencyCheck();
 
-            if (log.isDebugEnabled())
-                log.debug("Partition map after full update: " + fullMapString());
+                if (log.isDebugEnabled())
+                    log.debug("Partition map after full update: " + fullMapString());
 
-            if (changed)
-                ctx.exchange().scheduleResendPartitions();
+                if (changed)
+                    ctx.exchange().scheduleResendPartitions();
 
-            return changed;
+                return changed;
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
         }
         finally {
-            lock.writeLock().unlock();
+            ctx.database().checkpointReadUnlock();
         }
     }
 
@@ -2188,27 +2212,34 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
     /** {@inheritDoc} */
     @Override public void onEvicted(GridDhtLocalPartition part, boolean updateSeq) {
-        lock.writeLock().lock();
+        ctx.database().checkpointReadLock();
 
         try {
-            if (stopping)
-                return;
+            lock.writeLock().lock();
 
-            assert part.state() == EVICTED;
+            try {
+                if (stopping)
+                    return;
 
-            long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get();
+                assert part.state() == EVICTED;
 
-            if (part.reload())
-                part = createPartition(part.id());
+                long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get();
 
-            assert lastTopChangeVer.initialized() : lastTopChangeVer;
+                if (part.reload())
+                    part = createPartition(part.id());
 
-            updateLocal(part.id(), part.state(), seq, lastTopChangeVer);
+                assert lastTopChangeVer.initialized() : lastTopChangeVer;
 
-            consistencyCheck();
+                updateLocal(part.id(), part.state(), seq, lastTopChangeVer);
+
+                consistencyCheck();
+            }
+            finally {
+                lock.writeLock().unlock();
+            }
         }
         finally {
-            lock.writeLock().unlock();
+            ctx.database().checkpointReadUnlock();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/21de1c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 1b5dae6..85e3baa 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2281,6 +2281,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 for (DbCheckpointListener lsnr : lsnrs)
                     lsnr.onCheckpointBegin(ctx0);
 
+                if (curr.nextSnapshot)
+                    snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map);
+
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     if (grp.isLocal())
                         continue;
@@ -2300,8 +2303,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                     cpRec.addCacheGroupState(grp.groupId(), state);
                 }
 
-                if (curr.nextSnapshot)
-                    snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map);
 
                 cpPagesTuple = beginAllCheckpoints();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/21de1c56/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java
new file mode 100644
index 0000000..3969fb6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsExchangeDuringCheckpointTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgnitePdsExchangeDuringCheckpointTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /**
+     *
+     */
+    public void testExchangeOnNodeLeft() throws Exception {
+        for (int i = 0; i < 5; i++) {
+            startGrids(3);
+            IgniteEx ignite = grid(1);
+            ignite.active(true);
+
+            awaitPartitionMapExchange();
+
+            stopGrid(0, true);
+
+            awaitPartitionMapExchange();
+
+            ignite.context().cache().context().database().wakeupForCheckpoint("test").get(10000);
+
+            afterTest();
+        }
+    }
+
+    /**
+     *
+     */
+    public void testExchangeOnNodeJoin() throws Exception {
+        for (int i = 0; i < 5; i++) {
+            startGrids(2);
+            IgniteEx ignite = grid(1);
+            ignite.active(true);
+
+            awaitPartitionMapExchange();
+
+            IgniteEx ex = startGrid(2);
+
+            awaitPartitionMapExchange();
+
+            ex.context().cache().context().database().wakeupForCheckpoint("test").get(10000);
+
+            afterTest();
+        }
+    }
+
+    /**
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        MemoryConfiguration memCfg = new MemoryConfiguration();
+
+        MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration();
+
+        memPlcCfg.setName("dfltMemPlc");
+        memPlcCfg.setInitialSize(100 * 1024 * 1024);
+        memPlcCfg.setMaxSize(1000 * 1024 * 1024);
+
+        memCfg.setDefaultMemoryPolicyName("dfltMemPlc");
+        memCfg.setMemoryPolicies(memPlcCfg);
+
+        cfg.setMemoryConfiguration(memCfg);
+
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 4096));
+
+        cfg.setCacheConfiguration(ccfg);
+
+        PersistentStoreConfiguration psiCfg = new PersistentStoreConfiguration()
+            .setCheckpointingThreads(1)
+            .setCheckpointingFrequency(1)
+            .setWalMode(WALMode.LOG_ONLY);
+
+        cfg.setPersistentStoreConfiguration(psiCfg);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+        deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/21de1c56/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
index 568af94..6e16d2e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheDataStructuresSelfTestSuite.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.datastructures.IgniteClientDa
 import org.apache.ignite.internal.processors.cache.datastructures.IgniteClientDiscoveryDataStructuresTest;
 import org.apache.ignite.internal.processors.cache.datastructures.IgniteDataStructureUniqueNameTest;
 import org.apache.ignite.internal.processors.cache.datastructures.IgniteDataStructureWithJobTest;
+import org.apache.ignite.internal.processors.cache.datastructures.SemaphoreFailoverSafeReleasePermitsTest;
 import org.apache.ignite.internal.processors.cache.datastructures.local.GridCacheLocalAtomicQueueApiSelfTest;
 import org.apache.ignite.internal.processors.cache.datastructures.local.GridCacheLocalAtomicSetSelfTest;
 import org.apache.ignite.internal.processors.cache.datastructures.local.GridCacheLocalQueueApiSelfTest;
@@ -129,8 +130,7 @@ public class IgniteCacheDataStructuresSelfTestSuite extends TestSuite
{
         suite.addTest(new TestSuite(IgnitePartitionedCountDownLatchSelfTest.class));
         suite.addTest(new TestSuite(IgniteDataStructureWithJobTest.class));
         suite.addTest(new TestSuite(IgnitePartitionedSemaphoreSelfTest.class));
-        // TODO https://issues.apache.org/jira/browse/IGNITE-4173, enable when fixed.
-        // suite.addTest(new TestSuite(SemaphoreFailoverSafeReleasePermitsTest.class));
+        suite.addTest(new TestSuite(SemaphoreFailoverSafeReleasePermitsTest.class));
         // TODO IGNITE-3141, enabled when fixed.
         // suite.addTest(new TestSuite(IgnitePartitionedLockSelfTest.class));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/21de1c56/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index ab8ff81..29ea64c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsExchangeDuringCheckpointTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPageSizesTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistenceMetricsSelfTest;
@@ -73,6 +74,9 @@ public class IgnitePdsTestSuite2 extends TestSuite {
         suite.addTestSuite(IgniteWalFlushFailoverTest.class);
 
         suite.addTestSuite(IgniteWalReaderTest.class);
+
+        suite.addTestSuite(IgnitePdsExchangeDuringCheckpointTest.class);
+
         return suite;
     }
 }


Mime
View raw message