ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [49/50] [abbrv] ignite git commit: Optimizations from ignite-5068.
Date Wed, 03 May 2017 07:22:27 GMT
Optimizations from ignite-5068.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/58b6e05e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/58b6e05e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/58b6e05e

Branch: refs/heads/ignite-gg-8.0.3.ea6-clients-test
Commit: 58b6e05e82978c68ba9e2e53a3b9b866e2c474ca
Parents: 52556f4
Author: Ilya Lantukh <ilantukh@gridgain.com>
Authored: Fri Apr 28 19:57:32 2017 +0300
Committer: Ilya Lantukh <ilantukh@gridgain.com>
Committed: Fri Apr 28 19:57:32 2017 +0300

----------------------------------------------------------------------
 .../dht/GridClientPartitionTopology.java        |   5 +
 .../dht/GridDhtPartitionTopology.java           |   6 +
 .../dht/GridDhtPartitionTopologyImpl.java       | 473 +++++++++++--------
 .../GridDhtPartitionsExchangeFuture.java        |   9 +
 4 files changed, 302 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/58b6e05e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5c5a3c4..e71c18f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -778,6 +778,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
     }
 
     /** {@inheritDoc} */
+    @Override public void onExchangeDone(AffinityAssignment assignment) {
+        // no-op
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) {
         assert false : "detectLostPartitions should never be called on client topology";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/58b6e05e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 2bef267..5df8714 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -326,4 +326,10 @@ public interface GridDhtPartitionTopology {
      * @return Set of node IDs that should reload partitions.
      */
     public Set<UUID> setOwners(int p, Set<UUID> owners, boolean haveHistory,
boolean updateSeq);
+
+    /**
+     * Callback on exchange done.
+     * @param assignment New affinity assignment.
+     */
+    public void onExchangeDone(AffinityAssignment assignment);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/58b6e05e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index fb09b38..d1283c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -79,6 +79,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     private static final boolean FULL_MAP_DEBUG = false;
 
     /** */
+    private static final boolean FAST_DIFF_REBUILD = true;
+
+    /** */
     private static final Long ZERO = 0L;
 
     /** Context. */
@@ -93,8 +96,11 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     /** Node to partition map. */
     private GridDhtPartitionFullMap node2part;
 
-    /** Partition to node map. */
-    private final Map<Integer, Set<UUID>> part2node;
+    /** */
+    private final Map<Integer, Set<UUID>> diffFromAffinity = new HashMap<>();
+
+    /** */
+    private volatile AffinityTopologyVersion diffFromAffinityVer = AffinityTopologyVersion.NONE;
 
     /** */
     private GridDhtPartitionExchangeId lastExchangeId;
@@ -142,8 +148,6 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         log = cctx.logger(getClass());
 
         locParts = new AtomicReferenceArray<>(cctx.config().getAffinity().partitions());
-
-        part2node = new HashMap<>(cctx.config().getAffinity().partitions(), 1.0f);
     }
 
     /** {@inheritDoc} */
@@ -160,7 +164,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         try {
             node2part = null;
 
-            part2node.clear();
+            diffFromAffinity.clear();
 
             lastExchangeId = null;
 
@@ -168,6 +172,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             topReadyFut = null;
 
+            diffFromAffinityVer = AffinityTopologyVersion.NONE;
+
             rebalancedTopVer = AffinityTopologyVersion.NONE;
 
             topVer = AffinityTopologyVersion.NONE;
@@ -863,18 +869,42 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             List<ClusterNode> nodes = null;
 
-            Collection<UUID> nodeIds = part2node.get(p);
+            if (!topVer.equals(diffFromAffinityVer)) {
+                log.error("??? node2part [topVer=" + topVer + ", diffVer=" + diffFromAffinityVer
+ "]");
+
+                nodes = new ArrayList<>();
+
+                nodes.addAll(affNodes);
 
-            if (!F.isEmpty(nodeIds)) {
-                for (UUID nodeId : nodeIds) {
-                    HashSet<UUID> affIds = affAssignment.getIds(p);
+                for (Map.Entry<UUID, GridDhtPartitionMap2> entry : node2part.entrySet())
{
+                    GridDhtPartitionState state = entry.getValue().get(p);
 
-                    if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING))
{
+                    ClusterNode n = cctx.discovery().node(entry.getKey());
+
+                    if (n != null && state != null && (state == MOVING ||
state == OWNING) && !nodes.contains(n)
+                        && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion()))
{
+                        nodes.add(n);
+                    }
+
+                }
+
+                return nodes;
+            }
+
+            Collection<UUID> diffIds = diffFromAffinity.get(p);
+
+            if (!F.isEmpty(diffIds)) {
+                HashSet<UUID> affIds = affAssignment.getIds(p);
+
+                for (UUID nodeId : diffIds) {
+                    assert !affIds.contains(nodeId);
+
+                    if (hasState(p, nodeId, OWNING, MOVING)) {
                         ClusterNode n = cctx.discovery().node(nodeId);
 
                         if (n != null && (topVer.topologyVersion() < 0 || n.order()
<= topVer.topologyVersion())) {
                             if (nodes == null) {
-                                nodes = new ArrayList<>(affNodes.size() + 2);
+                                nodes = new ArrayList<>(affNodes.size() + diffIds.size());
 
                                 nodes.addAll(affNodes);
                             }
@@ -903,7 +933,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         AffinityTopologyVersion topVer,
         GridDhtPartitionState state,
         GridDhtPartitionState... states) {
-        Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId()))
: null;
+        Collection<UUID> allIds = F.nodeIds(discoCache.cacheAffinityNodes(cctx.cacheId()));
 
         lock.readLock().lock();
 
@@ -913,20 +943,10 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 ", node2part=" + node2part +
                 ", cache=" + cctx.name() + ']';
 
-            Collection<UUID> nodeIds = part2node.get(p);
-
             // Node IDs can be null if both, primary and backup, nodes disappear.
-            int size = nodeIds == null ? 0 : nodeIds.size();
-
-            if (size == 0)
-                return Collections.emptyList();
-
-            List<ClusterNode> nodes = new ArrayList<>(size);
-
-            for (UUID id : nodeIds) {
-                if (topVer.topologyVersion() > 0 && !F.contains(allIds, id))
-                    continue;
+            List<ClusterNode> nodes = new ArrayList<>();
 
+            for (UUID id : allIds) {
                 if (hasState(p, id, state, states)) {
                     ClusterNode n = cctx.discovery().node(id);
 
@@ -1098,30 +1118,39 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             node2part = partMap;
 
-            part2node.clear();
+            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+
+            if (diffFromAffinityVer.compareTo(affVer) <= 0) {
+                AffinityAssignment affAssignment = cctx.affinity().assignment(affVer);
 
-            for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) {
-                for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet())
{
-                    if (e0.getValue() != MOVING && e0.getValue() != OWNING)
-                        continue;
+                for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet())
{
+                    for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet())
{
+                        int p = e0.getKey();
 
-                    int p = e0.getKey();
+                        Set<UUID> diffIds = diffFromAffinity.get(p);
 
-                    Set<UUID> ids = part2node.get(p);
+                        if ((e0.getValue() == MOVING || e0.getValue() == OWNING || e0.getValue()
== RENTING) &&
+                            !affAssignment.getIds(p).contains(e.getKey())) {
 
-                    if (ids == null)
-                        // Initialize HashSet to size 3 in anticipation that there won't
be
-                        // more than 3 nodes per partitions.
-                        part2node.put(p, ids = U.newHashSet(3));
+                            if (diffIds == null)
+                                diffFromAffinity.put(p, diffIds = U.newHashSet(3));
 
-                    ids.add(e.getKey());
+                            diffIds.add(e.getKey());
+                        }
+                        else {
+                            if (diffIds != null && diffIds.remove(e.getKey())) {
+                                if (diffIds.isEmpty())
+                                    diffFromAffinity.remove(p);
+                            }
+                        }
+                    }
                 }
+
+                diffFromAffinityVer = affVer;
             }
 
             boolean changed = false;
 
-            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
-
             GridDhtPartitionMap2 nodeMap = partMap.get(cctx.localNodeId());
 
             if (nodeMap != null && cctx.shared().database().persistenceEnabled())
{
@@ -1296,37 +1325,52 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             node2part.put(parts.nodeId(), parts);
 
-            // Add new mappings.
-            for (Map.Entry<Integer,GridDhtPartitionState> e : parts.entrySet()) {
-                int p = e.getKey();
+            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
 
-                Set<UUID> ids = part2node.get(p);
+            if (affVer.compareTo(diffFromAffinityVer) >= 0) {
+                AffinityAssignment affAssignment = cctx.affinity().assignment(affVer);
 
-                if (e.getValue() == MOVING || e.getValue() == OWNING) {
-                    if (ids == null)
-                        // Initialize HashSet to size 3 in anticipation that there won't
be
-                        // more than 3 nodes per partition.
-                        part2node.put(p, ids = U.newHashSet(3));
+                // Add new mappings.
+                for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet())
{
+                    int p = e.getKey();
 
-                    changed |= ids.add(parts.nodeId());
-                }
-                else {
-                    if (ids != null)
-                        changed |= ids.remove(parts.nodeId());
+                    Set<UUID> diffIds = diffFromAffinity.get(p);
+
+                    if ((e.getValue() == MOVING || e.getValue() == OWNING || e.getValue()
== RENTING)
+                        && !affAssignment.getIds(p).contains(parts.nodeId())) {
+                        if (diffIds == null)
+                            diffFromAffinity.put(p, diffIds = U.newHashSet(3));
+
+                        if (diffIds.add(parts.nodeId()))
+                            changed = true;
+                    }
+                    else {
+                        if (diffIds != null && diffIds.remove(parts.nodeId())) {
+                            changed = true;
+
+                            if (diffIds.isEmpty())
+                                diffFromAffinity.remove(p);
+                        }
+
+                    }
                 }
-            }
 
-            // Remove obsolete mappings.
-            if (cur != null) {
-                for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
-                    Set<UUID> ids = part2node.get(p);
+                // Remove obsolete mappings.
+                if (cur != null) {
+                    for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
+                        Set<UUID> ids = diffFromAffinity.get(p);
 
-                    if (ids != null)
-                        changed |= ids.remove(parts.nodeId());
+                        if (ids != null && ids.remove(parts.nodeId())) {
+                            changed = true;
+
+                            if (ids.isEmpty())
+                                diffFromAffinity.remove(p);
+                        }
+                    }
                 }
-            }
 
-            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+                diffFromAffinityVer = affVer;
+            }
 
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer)
>= 0) {
                 List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
@@ -1352,44 +1396,96 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     }
 
     /** {@inheritDoc} */
-    @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) {
+    @Override public void onExchangeDone(AffinityAssignment assignment) {
         lock.writeLock().lock();
 
         try {
-            int parts = cctx.affinity().partitions();
+            if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0)
+                rebuildDiff(assignment);
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * @param affAssignment New affinity assignment.
+     */
+    private void rebuildDiff(AffinityAssignment affAssignment) {
+        assert lock.isWriteLockedByCurrentThread();
 
-            Collection<Integer> lost = null;
+        if (node2part == null)
+            return;
 
-            for (int p = 0; p < parts; p++) {
-                boolean foundOwner = false;
+        if (FAST_DIFF_REBUILD) {
+            Collection<UUID> affNodes = F.nodeIds(cctx.discovery().cacheAffinityNodes(cctx.name(),
affAssignment.topologyVersion()));
 
-                Set<UUID> nodeIds = part2node.get(p);
+            for (Map.Entry<Integer, Set<UUID>> e : diffFromAffinity.entrySet())
{
+                int p = e.getKey();
+
+                Iterator<UUID> iter = e.getValue().iterator();
+
+                while (iter.hasNext()) {
+                    UUID nodeId = iter.next();
 
-                if (nodeIds != null) {
-                    for (UUID nodeId : nodeIds) {
-                        GridDhtPartitionMap2 partMap = node2part.get(nodeId);
+                    if (!affNodes.contains(nodeId) || affAssignment.getIds(p).contains(nodeId))
+                        iter.remove();
+                }
+            }
+        }
+        else {
+            for (Map.Entry<UUID, GridDhtPartitionMap2> e : node2part.entrySet()) {
+                UUID nodeId = e.getKey();
 
-                        GridDhtPartitionState state = partMap.get(p);
+                for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet())
{
+                    int p0 = e0.getKey();
 
-                        if (state == OWNING) {
-                            foundOwner = true;
+                    GridDhtPartitionState state = e0.getValue();
 
-                            break;
-                        }
+                    Set<UUID> ids = diffFromAffinity.get(p0);
+
+                    if ((state == MOVING || state == OWNING) && !affAssignment.getIds(p0).contains(nodeId))
{
+                        if (ids == null)
+                            diffFromAffinity.put(p0, ids = U.newHashSet(3));
+
+                        ids.add(nodeId);
+                    }
+                    else {
+                        if (ids != null)
+                            ids.remove(nodeId);
                     }
                 }
+            }
+        }
+
+        diffFromAffinityVer = affAssignment.topologyVersion();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) {
+        lock.writeLock().lock();
+
+        try {
+            if (node2part == null)
+                return false;
+
+            int parts = cctx.affinity().partitions();
 
-                if (!foundOwner) {
-                    if (lost == null)
-                        lost = new HashSet<>(parts - p, 1.0f);
+            Set<Integer> lost = new HashSet<>(parts);
 
-                    lost.add(p);
+            for (int p = 0; p < parts; p++)
+                lost.add(p);
+
+            for (GridDhtPartitionMap2 partMap : node2part.values()) {
+                for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet())
{
+                    if (e.getValue() == OWNING)
+                        lost.remove(e.getKey());
                 }
             }
 
             boolean changed = false;
 
-            if (lost != null) {
+            if (!lost.isEmpty()) {
                 PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy();
 
                 assert plc != null;
@@ -1410,16 +1506,17 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                     }
                     // Update map for remote node.
                     else if (plc != PartitionLossPolicy.IGNORE) {
-                        Set<UUID> nodeIds = part2node.get(part);
-
-                        if (nodeIds != null) {
-                            for (UUID nodeId : nodeIds) {
-                                GridDhtPartitionMap2 nodeMap = node2part.get(nodeId);
-
-                                if (nodeMap.get(part) != EVICTED)
-                                    nodeMap.put(part, LOST);
-                            }
-                        }
+                        // TODO
+//                        Set<UUID> nodeIds = part2node.get(part);
+//
+//                        if (nodeIds != null) {
+//                            for (UUID nodeId : nodeIds) {
+//                                GridDhtPartitionMap nodeMap = node2part.get(nodeId);
+//
+//                                if (nodeMap.get(part) != EVICTED)
+//                                    nodeMap.put(part, LOST);
+//                            }
+//                        }
                     }
 
                     if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST))
@@ -1440,86 +1537,83 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
     /** {@inheritDoc} */
     @Override public void resetLostPartitions() {
-        lock.writeLock().lock();
-
-        try {
-            int parts = cctx.affinity().partitions();
-            long updSeq = updateSeq.incrementAndGet();
-
-            for (int part = 0; part < parts; part++) {
-                Set<UUID> nodeIds = part2node.get(part);
-
-                if (nodeIds != null) {
-                    boolean lost = false;
-
-                    for (UUID node : nodeIds) {
-                        GridDhtPartitionMap2 map = node2part.get(node);
-
-                        if (map.get(part) == LOST) {
-                            lost = true;
-
-                            break;
-                        }
-                    }
-
-                    if (lost) {
-                        GridDhtLocalPartition locPart = localPartition(part, topVer, false);
-
-                        if (locPart != null) {
-                            boolean marked = locPart.own();
-
-                            if (marked)
-                                updateLocal(locPart.id(), locPart.state(), updSeq);
-                        }
-
-                        for (UUID nodeId : nodeIds) {
-                            GridDhtPartitionMap2 nodeMap = node2part.get(nodeId);
-
-                            if (nodeMap.get(part) == LOST)
-                                nodeMap.put(part, OWNING);
-                        }
-                    }
-                }
-            }
-
-            checkEvictions(updSeq, cctx.affinity().assignments(topVer));
-
-            cctx.needsRecovery(false);
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
+        // TODO
+
+//        lock.writeLock().lock();
+//
+//        try {
+//            int parts = cctx.affinity().partitions();
+//            long updSeq = updateSeq.incrementAndGet();
+//
+//            for (int part = 0; part < parts; part++) {
+//                Set<UUID> nodeIds = part2node.get(part);
+//
+//                if (nodeIds != null) {
+//                    boolean lost = false;
+//
+//                    for (UUID node : nodeIds) {
+//                        GridDhtPartitionMap2 map = node2part.get(node);
+//
+//                        if (map.get(part) == LOST) {
+//                            lost = true;
+//
+//                            break;
+//                        }
+//                    }
+//
+//                    if (lost) {
+//                        GridDhtLocalPartition locPart = localPartition(part, topVer, false);
+//
+//                        if (locPart != null) {
+//                            boolean marked = locPart.own();
+//
+//                            if (marked)
+//                                updateLocal(locPart.id(), locPart.state(), updSeq);
+//                        }
+//
+//                        for (UUID nodeId : nodeIds) {
+//                            GridDhtPartitionMap2 nodeMap = node2part.get(nodeId);
+//
+//                            if (nodeMap.get(part) == LOST)
+//                                nodeMap.put(part, OWNING);
+//                        }
+//                    }
+//                }
+//            }
+//
+//            checkEvictions(updSeq, cctx.affinity().assignments(topVer));
+//
+//            cctx.needsRecovery(false);
+//        }
+//        finally {
+//            lock.writeLock().unlock();
+//        }
     }
 
     /** {@inheritDoc} */
     @Override public Collection<Integer> lostPartitions() {
+        if (cctx.config().getPartitionLossPolicy() == PartitionLossPolicy.IGNORE)
+            return Collections.emptySet();
+
         lock.readLock().lock();
 
         try {
-            Collection<Integer> res = null;
+            Set<Integer> res = null;
 
             int parts = cctx.affinity().partitions();
 
-            for (int part = 0; part < parts; part++) {
-                Set<UUID> nodeIds = part2node.get(part);
-
-                if (nodeIds != null) {
-                    for (UUID node : nodeIds) {
-                        GridDhtPartitionMap2 map = node2part.get(node);
-
-                        if (map.get(part) == LOST) {
-                            if (res == null)
-                                res = new ArrayList<>(parts - part);
+            for (GridDhtPartitionMap2 partMap : node2part.values()) {
+                for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet())
{
+                    if (e.getValue() == LOST) {
+                        if (res == null)
+                            res = new HashSet<>(parts);
 
-                            res.add(part);
-
-                            break;
-                        }
+                        res.add(e.getKey());
                     }
                 }
             }
 
-            return res == null ? Collections.<Integer>emptyList() : res;
+            return res == null ? Collections.<Integer>emptySet() : res;
         }
         finally {
             lock.readLock().unlock();
@@ -1750,12 +1844,18 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             map.put(p, state);
 
-            Set<UUID> ids = part2node.get(p);
+            if (state == MOVING || state == OWNING) {
+                    AffinityAssignment assignment = cctx.affinity().assignment(diffFromAffinityVer);
 
-            if (ids == null)
-                part2node.put(p, ids = U.newHashSet(3));
+                    if (!assignment.getIds(p).contains(cctx.localNodeId())) {
+                        Set<UUID> diffIds = diffFromAffinity.get(p);
 
-            ids.add(locNodeId);
+                        if (diffIds == null)
+                            diffFromAffinity.put(p, diffIds = U.newHashSet(3));
+
+                        diffIds.add(cctx.localNodeId());
+                    }
+                }
         }
 
         return updateSeq;
@@ -1787,14 +1887,10 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             if (parts != null) {
                 for (Integer p : parts.keySet()) {
-                    Set<UUID> nodeIds = part2node.get(p);
+                    Set<UUID> diffIds = diffFromAffinity.get(p);
 
-                    if (nodeIds != null) {
-                        nodeIds.remove(nodeId);
-
-                        if (nodeIds.isEmpty())
-                            part2node.remove(p);
-                    }
+                    if (diffIds != null)
+                        diffIds.remove(nodeId);
                 }
             }
 
@@ -1986,7 +2082,25 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 if (affNodes.isEmpty())
                     continue;
 
-                List<ClusterNode> owners = owners(i);
+                Set<ClusterNode> owners = U.newHashSet(affNodes.size());
+
+                for (ClusterNode node : affNodes) {
+                    if (hasState(i, node.id(), OWNING))
+                        owners.add(node);
+                }
+
+                Set<UUID> diff = diffFromAffinity.get(i);
+
+                if (diff != null) {
+                    for (UUID nodeId : diff) {
+                        if (hasState(i, nodeId, OWNING)) {
+                            ClusterNode node = cctx.discovery().node(nodeId);
+
+                            if (node != null)
+                                owners.add(node);
+                        }
+                    }
+                }
 
                 if (affNodes.size() != owners.size() || !owners.containsAll(affNodes))
                     return;
@@ -2035,30 +2149,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
      * Checks consistency after all operations.
      */
     private void consistencyCheck() {
-        if (CONSISTENCY_CHECK) {
-            if (node2part == null)
-                return;
-
-            for (Map.Entry<UUID, GridDhtPartitionMap2> e : node2part.entrySet()) {
-                for (Integer p : e.getValue().keySet()) {
-                    Set<UUID> nodeIds = part2node.get(p);
-
-                    assert nodeIds != null : "Failed consistency check [part=" + p + ", nodeId="
+ e.getKey() + ']';
-                    assert nodeIds.contains(e.getKey()) : "Failed consistency check [part="
+ p + ", nodeId=" +
-                        e.getKey() + ", nodeIds=" + nodeIds + ']';
-                }
-            }
-
-            for (Map.Entry<Integer, Set<UUID>> e : part2node.entrySet()) {
-                for (UUID nodeId : e.getValue()) {
-                    GridDhtPartitionMap2 map = node2part.get(nodeId);
-
-                    assert map != null : "Failed consistency check [part=" + e.getKey() +
", nodeId=" + nodeId + ']';
-                    assert map.containsKey(e.getKey()) : "Failed consistency check [part="
+ e.getKey() +
-                        ", nodeId=" + nodeId + ']';
-                }
-            }
-        }
+        // no-op
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/58b6e05e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 078e67b5..28c3956 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1342,6 +1342,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         cctx.database().releaseHistoryForExchange();
 
+        if (err == null && realExchange) {
+            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                if (cacheCtx.isLocal())
+                    continue;
+
+                cacheCtx.topology().onExchangeDone(cacheCtx.affinity().assignment(topologyVersion()));
+            }
+        }
+
         if (super.onDone(res, err) && realExchange) {
             exchLog.info("exchange finished [topVer=" + topologyVersion() +
                 ", time1=" + duration() +


Mime
View raw message