ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [05/18] ignite git commit: ignite-5068 : WIP.
Date Thu, 08 Jun 2017 13:13:39 GMT
ignite-5068 : WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a011f575
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a011f575
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a011f575

Branch: refs/heads/ignite-5267-merge-ea
Commit: a011f575e7fa974f33d6adbf18a3a4fa32c197d0
Parents: 401b7a0
Author: Ilya Lantukh <ilantukh@gridgain.com>
Authored: Thu Apr 27 16:50:00 2017 +0300
Committer: Ilya Lantukh <ilantukh@gridgain.com>
Committed: Thu Apr 27 16:50:00 2017 +0300

----------------------------------------------------------------------
 .../affinity/GridAffinityAssignmentCache.java   |  2 +
 .../dht/GridClientPartitionTopology.java        |  2 +-
 .../dht/GridDhtPartitionTopology.java           |  2 +-
 .../dht/GridDhtPartitionTopologyImpl.java       | 86 ++++++++++++--------
 .../GridDhtPartitionsExchangeFuture.java        |  4 -
 5 files changed, 57 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a011f575/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 2399493..9570e62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -199,6 +199,8 @@ public class GridAffinityAssignmentCache {
             }
         }
 
+        ctx.cache().context().cacheContext(cacheId).topology().onAffinityInitialized(assignment);
+
         onHistoryAdded(assignment);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a011f575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index e026fea..4dc6b5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -755,7 +755,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
     }
 
     /** {@inheritDoc} */
-    @Override public void onAffinityInitialized(Map<Integer, List<UUID>> assignment)
{
+    @Override public void onAffinityInitialized(AffinityAssignment assignment) {
         // TODO
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a011f575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index b47321e..c1933c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -313,5 +313,5 @@ public interface GridDhtPartitionTopology {
      */
     public void setOwners(int p, Set<UUID> owners, boolean updateSeq);
 
-    public void onAffinityInitialized(Map<Integer, List<UUID>> assignment);
+    public void onAffinityInitialized(AffinityAssignment assignment);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a011f575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 80b83fa..11f4e83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -959,8 +959,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             List<ClusterNode> nodes = null;
 
-            if (!topVer.equals(diffFromAffinityVer)) {
-                System.out.println("??? node2part");
+            if (topVer.compareTo(diffFromAffinityVer) != 0) {
+                System.out.println("??? node2part [topVer=" + topVer + ", diffVer=" + diffFromAffinityVer
+ "]");
 
                 nodes = new ArrayList<>();
 
@@ -1012,6 +1012,53 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         }
     }
 
+    private void rebuildDiff(AffinityAssignment affAssignment) {
+        assert lock.isWriteLockedByCurrentThread();
+
+        if (node2part == null)
+            return;
+
+        for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
+            UUID nodeId = e.getKey();
+
+            for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet())
{
+                int p0 = e0.getKey();
+
+                GridDhtPartitionState state = e0.getValue();
+
+                Set<UUID> ids = diffFromAffinity.get(p0);
+
+                if ((state == MOVING || state == OWNING || state == RENTING) && !affAssignment.getIds(p0).contains(nodeId))
{
+                    if (ids == null)
+                        diffFromAffinity.put(p0, ids = U.newHashSet(3));
+
+                    ids.add(nodeId);
+                }
+                else {
+                    if (ids != null)
+                        ids.remove(nodeId);
+                }
+            }
+        }
+
+//        Collection<UUID> affNodes = F.nodeIds(cctx.discovery().cacheAffinityNodes(cctx.cacheId(),
affAssignment.topologyVersion()));
+//
+//        for (Map.Entry<Integer, Set<UUID>> e : diffFromAffinity.entrySet())
{
+//            int p = e.getKey();
+//
+//            Iterator<UUID> iter = e.getValue().iterator();
+//
+//            while (iter.hasNext()) {
+//                UUID nodeId = iter.next();
+//
+//                if (!affNodes.contains(nodeId) || affAssignment.getIds(p).contains(nodeId))
+//                    iter.remove();
+//            }
+//        }
+
+        diffFromAffinityVer = affAssignment.topologyVersion();
+    }
+
     /**
      * @param p Partition.
      * @param topVer Topology version ({@code -1} for all nodes).
@@ -1247,7 +1294,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 diffFromAffinityVer = affVer;
 
                 if (diffFromAffinitySize > 0)
-                    U.error(log, "??? S diffFromAffinitySize=" + diffFromAffinitySize + "
[exchId=" + exchId + ",cacheId=" + cctx.cacheId() + ",cacheName=" + cctx.name() + "]");
+                    U.error(log, "??? F diffFromAffinitySize=" + diffFromAffinitySize + "
[exchId=" + exchId + ",cacheId=" + cctx.cacheId() + ",cacheName=" + cctx.name() + "]");
             }
 
             boolean changed = false;
@@ -1440,7 +1487,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 diffFromAffinityVer = affVer;
 
                 if (diffFromAffinitySize > 0)
-                    U.error(log, "??? diffFromAffinitySize=" + diffFromAffinitySize + " [exchId="
+ exchId + ",cacheId=" + cctx.cacheId() + ",cacheName=" + cctx.name() + "]");
+                    U.error(log, "??? S diffFromAffinitySize=" + diffFromAffinitySize + "
[exchId=" + exchId + ",cacheId=" + cctx.cacheId() + ",cacheName=" + cctx.name() + "]");
             }
 
             if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer)
>= 0) {
@@ -1467,38 +1514,11 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     }
 
     /** {@inheritDoc} */
-    @Override public void onAffinityInitialized(Map<Integer, List<UUID>> assignment)
{
+    @Override public void onAffinityInitialized(AffinityAssignment assignment) {
         lock.writeLock().lock();
 
         try {
-            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
-
-            if (diffFromAffinityVer.compareTo(affVer) >= 0)
-                return;
-
-            for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
-                UUID nodeId = e.getKey();
-
-                for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet())
{
-                    int p = e0.getKey();
-                    GridDhtPartitionState state = e0.getValue();
-
-                    Set<UUID> ids = diffFromAffinity.get(p);
-
-                    if ((state == MOVING || state == OWNING || state == RENTING) &&
!assignment.get(p).contains(nodeId)) {
-                        if (ids == null)
-                            diffFromAffinity.put(p, ids = U.newHashSet(3));
-
-                        ids.add(nodeId);
-                    }
-                    else {
-                        if (ids != null)
-                            ids.remove(nodeId);
-                    }
-                }
-            }
-
-            diffFromAffinityVer = affVer;
+            rebuildDiff(assignment);
         }
         finally {
             lock.writeLock().unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a011f575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 119ceee..f6827ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1486,10 +1486,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
 
-            for (Map.Entry<Integer, Map<Integer, List<UUID>>> e : assignmentChange.entrySet())
{
-                cctx.cacheContext(e.getKey()).topology().onAffinityInitialized(e.getValue());
-            }
-
             GridDhtPartitionsFullMessage m = createPartitionsMessage(null, false);
 
             CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);


Mime
View raw message