ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [02/18] ignite git commit: ignite-5068 : Fixed diffFromAffinity consistency when affinity initialization is deferred (node left)
Date Thu, 08 Jun 2017 13:13:36 GMT
ignite-5068 : Fixed diffFromAffinity consistency when affinity initialization is deferred (node
left)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c094ac5a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c094ac5a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c094ac5a

Branch: refs/heads/ignite-5267-merge-ea
Commit: c094ac5a95e546f03713ec1def9157fe273bb032
Parents: ed5c096
Author: Ilya Lantukh <ilantukh@gridgain.com>
Authored: Mon Apr 24 18:55:34 2017 +0300
Committer: Ilya Lantukh <ilantukh@gridgain.com>
Committed: Mon Apr 24 18:55:34 2017 +0300

----------------------------------------------------------------------
 .../dht/GridClientPartitionTopology.java        |   5 +
 .../dht/GridDhtPartitionTopology.java           |   2 +
 .../dht/GridDhtPartitionTopologyImpl.java       | 170 ++++++++++++-------
 .../GridDhtPartitionsExchangeFuture.java        |   4 +
 4 files changed, 122 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c094ac5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index f3c3a1b..e026fea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -755,6 +755,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
     }
 
     /** {@inheritDoc} */
+    @Override public void onAffinityInitialized(Map<Integer, List<UUID>> assignment)
{
+        // TODO
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) {
         assert false : "detectLostPartitions should never be called on client topology";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c094ac5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index f9fd852..b47321e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -312,4 +312,6 @@ public interface GridDhtPartitionTopology {
      * @param owners Set of new owners.
      */
     public void setOwners(int p, Set<UUID> owners, boolean updateSeq);
+
+    public void onAffinityInitialized(Map<Integer, List<UUID>> assignment);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c094ac5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 23c7caa..fcc934f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -62,6 +62,8 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
@@ -93,12 +95,13 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     /** Node to partition map. */
     private GridDhtPartitionFullMap node2part;
 
-    /** Partition to node map. */
-    private Map<Integer, Set<UUID>> part2node = new HashMap<>();
-
+    /** */
     private final Map<Integer, Set<UUID>> diffFromAffinity = new HashMap<>();
 
     /** */
+    private volatile AffinityTopologyVersion diffFromAffinityVer = AffinityTopologyVersion.NONE;
+
+    /** */
     private GridDhtPartitionExchangeId lastExchangeId;
 
     /** */
@@ -168,6 +171,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             topReadyFut = null;
 
+            diffFromAffinityVer = AffinityTopologyVersion.NONE;
+
             rebalancedTopVer = AffinityTopologyVersion.NONE;
 
             topVer = AffinityTopologyVersion.NONE;
@@ -1178,35 +1183,43 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             node2part = partMap;
 
-            diffFromAffinity.clear();
-
             int diffFromAffinitySize = 0;
 
             AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
 
-            AffinityAssignment affAssignment = cctx.affinity().assignment(affVer);
+            if (diffFromAffinityVer.compareTo(affVer) <= 0) {
+                AffinityAssignment affAssignment = cctx.affinity().assignment(affVer);
 
-            for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
-                for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet())
{
-                    if (e0.getValue() != MOVING && e0.getValue() != OWNING &&
e0.getValue() != RENTING)
-                        continue;
+                for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
+                    for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet())
{
+                        int p = e0.getKey();
 
-                    int p = e0.getKey();
-
-                    if (!affAssignment.getIds(p).contains(partMap.nodeId())) {
                         Set<UUID> diffIds = diffFromAffinity.get(p);
 
-                        if (diffIds == null)
-                            diffFromAffinity.put(p, diffIds = U.newHashSet(3));
+                        if (e0.getValue() != MOVING && e0.getValue() != OWNING &&
e0.getValue() != RENTING &&
+                            !affAssignment.getIds(p).contains(partMap.nodeId())) {
+                            if (diffIds == null)
+                                diffFromAffinity.put(p, diffIds = U.newHashSet(3));
 
-                        if (diffIds.add(partMap.nodeId()))
-                            diffFromAffinitySize++;
+                            if (diffIds.add(partMap.nodeId()))
+                                diffFromAffinitySize++;
+                        }
+                        else {
+                            if (diffIds != null && diffIds.remove(partMap.nodeId()))
{
+                                diffFromAffinitySize--;
+
+                                if (diffIds.isEmpty())
+                                    diffFromAffinity.remove(p);
+                            }
+                        }
                     }
                 }
-            }
 
-            if (diffFromAffinitySize > 0)
-                U.error(log, "??? S diffFromAffinitySize=" + diffFromAffinitySize + " [exchId="
+ exchId + ",cacheId=" + cctx.cacheId() + ",cacheName=" + cctx.name() + "]");
+                diffFromAffinityVer = affVer;
+
+                if (diffFromAffinitySize > 0)
+                    U.error(log, "??? S diffFromAffinitySize=" + diffFromAffinitySize + "
[exchId=" + exchId + ",cacheId=" + cctx.cacheId() + ",cacheName=" + cctx.name() + "]");
+            }
 
             boolean changed = false;
 
@@ -1342,62 +1355,64 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             node2part.put(parts.nodeId(), parts);
 
-            diffFromAffinity.clear();
-
             AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
 
-            AffinityAssignment affAssignment = cctx.affinity().assignment(affVer);
+            if (diffFromAffinityVer.compareTo(affVer) <= 0) {
+                AffinityAssignment affAssignment = cctx.affinity().assignment(affVer);
 
-            int diffFromAffinitySize = 0;
+                int diffFromAffinitySize = 0;
 
-            // Add new mappings.
-            for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) {
-                int p = e.getKey();
+                // Add new mappings.
+                for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet())
{
+                    int p = e.getKey();
 
-                Set<UUID> diffIds = diffFromAffinity.get(p);
+                    Set<UUID> diffIds = diffFromAffinity.get(p);
 
-                if ((e.getValue() == MOVING || e.getValue() == OWNING || e.getValue() ==
RENTING)
-                    && !affAssignment.getIds(p).contains(parts.nodeId())) {
-                    if (diffIds == null)
-                        diffFromAffinity.put(p, diffIds = U.newHashSet(3));
+                    if ((e.getValue() == MOVING || e.getValue() == OWNING || e.getValue()
== RENTING)
+                        && !affAssignment.getIds(p).contains(parts.nodeId())) {
+                        if (diffIds == null)
+                            diffFromAffinity.put(p, diffIds = U.newHashSet(3));
 
-                    if (diffIds.add(parts.nodeId())) {
-                        changed = true;
+                        if (diffIds.add(parts.nodeId())) {
+                            changed = true;
 
-                        diffFromAffinitySize++;
+                            diffFromAffinitySize++;
+                        }
                     }
-                }
-                else {
-                    if (diffIds != null && diffIds.remove(parts.nodeId())) {
-                        changed = true;
+                    else {
+                        if (diffIds != null && diffIds.remove(parts.nodeId())) {
+                            changed = true;
 
-                        diffFromAffinitySize--;
+                            diffFromAffinitySize--;
 
-                        if (diffIds.isEmpty())
-                            diffFromAffinity.remove(p);
-                    }
+                            if (diffIds.isEmpty())
+                                diffFromAffinity.remove(p);
+                        }
 
+                    }
                 }
-            }
 
-            // Remove obsolete mappings.
-            if (cur != null) {
-                for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
-                    Set<UUID> ids = diffFromAffinity.get(p);
+                // Remove obsolete mappings.
+                if (cur != null) {
+                    for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
+                        Set<UUID> ids = diffFromAffinity.get(p);
 
-                    if (ids != null && ids.remove(parts.nodeId())) {
-                        changed = true;
+                        if (ids != null && ids.remove(parts.nodeId())) {
+                            changed = true;
 
-                        diffFromAffinitySize--;
+                            diffFromAffinitySize--;
 
-                        if (ids.isEmpty())
-                            diffFromAffinity.remove(p);
+                            if (ids.isEmpty())
+                                diffFromAffinity.remove(p);
+                        }
                     }
                 }
-            }
 
-            if (diffFromAffinitySize > 0)
-                U.error(log, "??? diffFromAffinitySize=" + diffFromAffinitySize + " [exchId="
+ exchId + ",cacheId=" + cctx.cacheId() + ",cacheName=" + cctx.name() + "]");
+                diffFromAffinityVer = affVer;
+
+                if (diffFromAffinitySize > 0)
+                    U.error(log, "??? diffFromAffinitySize=" + diffFromAffinitySize + " [exchId="
+ exchId + ",cacheId=" + cctx.cacheId() + ",cacheName=" + cctx.name() + "]");
+            }
 
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer)
>= 0) {
                 List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
@@ -1423,6 +1438,45 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     }
 
     /** {@inheritDoc} */
+    @Override public void onAffinityInitialized(Map<Integer, List<UUID>> assignment)
{
+        lock.writeLock().lock();
+
+        try {
+            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+
+            if (diffFromAffinityVer.compareTo(affVer) >= 0)
+                return;
+
+            for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
+                UUID nodeId = e.getKey();
+
+                for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet())
{
+                    int p = e0.getKey();
+                    GridDhtPartitionState state = e0.getValue();
+
+                    Set<UUID> ids = diffFromAffinity.get(p);
+
+                    if ((state == MOVING || state == OWNING || state == RENTING) &&
!assignment.get(p).contains(nodeId)) {
+                        if (ids == null)
+                            diffFromAffinity.put(p, ids = U.newHashSet(3));
+
+                        ids.add(nodeId);
+                    }
+                    else {
+                        if (ids != null)
+                            ids.remove(nodeId);
+                    }
+                }
+            }
+
+            diffFromAffinityVer = affVer;
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) {
         lock.writeLock().lock();
 
@@ -1991,9 +2045,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 if (affNodes.isEmpty())
                     continue;
 
-                List<ClusterNode> owners = owners(i);
-
-                if (affNodes.size() != owners.size() || !owners.containsAll(affNodes))
+                if (!F.isEmpty(diffFromAffinity.get(i)))
                     return;
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c094ac5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f6827ab..119ceee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1486,6 +1486,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
 
+            for (Map.Entry<Integer, Map<Integer, List<UUID>>> e : assignmentChange.entrySet())
{
+                cctx.cacheContext(e.getKey()).topology().onAffinityInitialized(e.getValue());
+            }
+
             GridDhtPartitionsFullMessage m = createPartitionsMessage(null, false);
 
             CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);


Mime
View raw message