ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [06/18] ignite git commit: ignite-5068 : WIP.
Date Thu, 08 Jun 2017 13:13:40 GMT
ignite-5068 : WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c67dc6f8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c67dc6f8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c67dc6f8

Branch: refs/heads/ignite-5267-merge-ea
Commit: c67dc6f81c3c69fd58c3440fd8ab524edbd18d8d
Parents: a011f57
Author: Ilya Lantukh <ilantukh@gridgain.com>
Authored: Fri Apr 28 14:06:01 2017 +0300
Committer: Ilya Lantukh <ilantukh@gridgain.com>
Committed: Fri Apr 28 14:06:01 2017 +0300

----------------------------------------------------------------------
 .../affinity/GridAffinityAssignmentCache.java   |  2 -
 .../dht/GridDhtPartitionTopologyImpl.java       | 67 ++++++++++----------
 .../GridDhtPartitionsExchangeFuture.java        | 16 +++++
 3 files changed, 50 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c67dc6f8/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 9570e62..2399493 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -199,8 +199,6 @@ public class GridAffinityAssignmentCache {
             }
         }
 
-        ctx.cache().context().cacheContext(cacheId).topology().onAffinityInitialized(assignment);
-
         onHistoryAdded(assignment);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c67dc6f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 11f4e83..898cb55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -959,7 +959,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             List<ClusterNode> nodes = null;
 
-            if (topVer.compareTo(diffFromAffinityVer) != 0) {
+            if (!topVer.equals(diffFromAffinityVer)) {
                 System.out.println("??? node2part [topVer=" + topVer + ", diffVer=" + diffFromAffinityVer
+ "]");
 
                 nodes = new ArrayList<>();
@@ -1018,44 +1018,44 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         if (node2part == null)
             return;
 
-        for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
-            UUID nodeId = e.getKey();
+//        for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
+//            UUID nodeId = e.getKey();
+//
+//            for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet())
{
+//                int p0 = e0.getKey();
+//
+//                GridDhtPartitionState state = e0.getValue();
+//
+//                Set<UUID> ids = diffFromAffinity.get(p0);
+//
+//                if ((state == MOVING || state == OWNING || state == RENTING) &&
!affAssignment.getIds(p0).contains(nodeId)) {
+//                    if (ids == null)
+//                        diffFromAffinity.put(p0, ids = U.newHashSet(3));
+//
+//                    ids.add(nodeId);
+//                }
+//                else {
+//                    if (ids != null)
+//                        ids.remove(nodeId);
+//                }
+//            }
+//        }
 
-            for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet())
{
-                int p0 = e0.getKey();
+        Collection<UUID> affNodes = F.nodeIds(cctx.discovery().cacheAffinityNodes(cctx.cacheId(),
affAssignment.topologyVersion()));
 
-                GridDhtPartitionState state = e0.getValue();
+        for (Map.Entry<Integer, Set<UUID>> e : diffFromAffinity.entrySet()) {
+            int p = e.getKey();
 
-                Set<UUID> ids = diffFromAffinity.get(p0);
+            Iterator<UUID> iter = e.getValue().iterator();
 
-                if ((state == MOVING || state == OWNING || state == RENTING) && !affAssignment.getIds(p0).contains(nodeId))
{
-                    if (ids == null)
-                        diffFromAffinity.put(p0, ids = U.newHashSet(3));
+            while (iter.hasNext()) {
+                UUID nodeId = iter.next();
 
-                    ids.add(nodeId);
-                }
-                else {
-                    if (ids != null)
-                        ids.remove(nodeId);
-                }
+                if (!affNodes.contains(nodeId) || affAssignment.getIds(p).contains(nodeId))
+                    iter.remove();
             }
         }
 
-//        Collection<UUID> affNodes = F.nodeIds(cctx.discovery().cacheAffinityNodes(cctx.cacheId(),
affAssignment.topologyVersion()));
-//
-//        for (Map.Entry<Integer, Set<UUID>> e : diffFromAffinity.entrySet())
{
-//            int p = e.getKey();
-//
-//            Iterator<UUID> iter = e.getValue().iterator();
-//
-//            while (iter.hasNext()) {
-//                UUID nodeId = iter.next();
-//
-//                if (!affNodes.contains(nodeId) || affAssignment.getIds(p).contains(nodeId))
-//                    iter.remove();
-//            }
-//        }
-
         diffFromAffinityVer = affAssignment.topologyVersion();
     }
 
@@ -1433,7 +1433,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
 
-            if (diffFromAffinityVer.compareTo(affVer) <= 0) {
+            if (diffFromAffinityVer.compareTo(affVer) <= 0 && false) {
                 AffinityAssignment affAssignment = cctx.affinity().assignment(affVer);
 
                 int diffFromAffinitySize = 0;
@@ -1518,7 +1518,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         lock.writeLock().lock();
 
         try {
-            rebuildDiff(assignment);
+            if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0)
+                rebuildDiff(assignment);
         }
         finally {
             lock.writeLock().unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/c67dc6f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f6827ab..328f730 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1215,6 +1215,22 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         if (exchangeOnChangeGlobalState && err == null)
             cctx.kernalContext().state().onExchangeDone();
 
+        if (err == null && realExchange) {
+//            if (discoEvt.type() == EVT_NODE_JOINED || discoEvt.type() == EVT_NODE_LEFT
|| discoEvt.type() == EVT_NODE_FAILED || (discoEvt instanceof DiscoveryCustomEvent &&
((DiscoveryCustomEvent)discoEvt).customMessage() instanceof CacheAffinityChangeMessage)) {
+            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                if (cacheCtx.isLocal())
+                    continue;
+
+                try {
+                    cacheCtx.topology().onAffinityInitialized(cacheCtx.affinity().assignment(topologyVersion()));
+                }
+                catch (Exception e) {
+                    System.out.println("???");
+                }
+            }
+//            }
+        }
+
         if (super.onDone(res, err) && realExchange) {
             if (log.isDebugEnabled())
                 log.debug("Completed partition exchange [localNode=" + cctx.localNodeId()
+ ", exchange= " + this +


Mime
View raw message