ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: 5578
Date Wed, 02 Aug 2017 08:21:17 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 d6b2420e1 -> 079474297


5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/07947429
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/07947429
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/07947429

Branch: refs/heads/ignite-5578
Commit: 079474297ebd869e27e64f269b8942d2a9d65f9d
Parents: d6b2420
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Aug 2 11:21:10 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Aug 2 11:21:10 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 11 +++-
 .../dht/GridClientPartitionTopology.java        | 40 ++++++++++++++-
 .../dht/GridDhtPartitionTopology.java           |  4 +-
 .../dht/GridDhtPartitionTopologyImpl.java       | 13 +++--
 .../GridDhtPartitionsExchangeFuture.java        | 53 ++++++--------------
 5 files changed, 73 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/07947429/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index b0f2951..2138261 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1053,6 +1053,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     }
 
     /**
+     * @return All registered cache groups.
+     */
+    public Collection<CacheGroupDescriptor> cacheGroups() {
+        return caches.allGroups();
+    }
+
+    /**
      * @param c Cache closure.
      * @throws IgniteCheckedException If failed
      */
@@ -1657,7 +1664,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                             aff.initialize(topVer, assign);
                         }
 
-                        grpHolder.topology(fut).beforeExchange(fut, true);
+                        grpHolder.topology(fut).beforeExchange(fut, true, false);
                     }
                     else {
                         List<GridDhtPartitionsExchangeFuture> exchFuts = cctx.exchange().exchangeFutures();
@@ -1718,7 +1725,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                             aff.initialize(topVer, assign);
                         }
 
-                        grpHolder.topology(fut).beforeExchange(fut, true);
+                        grpHolder.topology(fut).beforeExchange(fut, true, false);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/07947429/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 37f4fd8..c821382 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -36,6 +36,8 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
+import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -251,8 +253,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
     }
 
     /** {@inheritDoc} */
-    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean
initParts)
-        throws IgniteCheckedException {
+    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut,
+        boolean initParts,
+        boolean updateMoving)
+        throws IgniteCheckedException
+    {
         ClusterNode loc = cctx.localNode();
 
         U.writeLock(lock);
@@ -262,12 +267,43 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
                 return;
 
             beforeExchange0(loc, exchFut);
+
+            if (updateMoving) {
+                ExchangeDiscoveryEvents evts = exchFut.context().events();
+
+                GridAffinityAssignmentCache aff = cctx.affinity().affinity(grpId);
+
+                assert aff.lastVersion().equals(evts.topologyVersion());
+
+                createMovingPartitions(aff.readyAffinity(evts.topologyVersion()));
+            }
         }
         finally {
             lock.writeLock().unlock();
         }
     }
 
+    private void createMovingPartitions(AffinityAssignment aff) {
+        for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
+            GridDhtPartitionMap map = e.getValue();
+
+            addMoving(map, aff.backupPartitions(e.getKey()));
+            addMoving(map, aff.primaryPartitions(e.getKey()));
+        }
+    }
+
+    private void addMoving(GridDhtPartitionMap map, Set<Integer> parts) {
+        if (F.isEmpty(parts))
+            return;
+
+        for (Integer p : parts) {
+            GridDhtPartitionState state = map.get(p);
+
+            if (state == null || state == EVICTED)
+                map.put(p, MOVING);
+        }
+    }
+
     /**
      * @param loc Local node.
      * @param exchFut Exchange future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/07947429/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 8911aa9..1aea453 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -105,7 +105,9 @@ public interface GridDhtPartitionTopology {
      * @param affReady Affinity ready flag.
      * @throws IgniteCheckedException If failed.
      */
-    public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean affReady)
+    public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut,
+        boolean affReady,
+        boolean updateMoving)
         throws IgniteCheckedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/07947429/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 5af4fef..d3f1e94 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -433,7 +433,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
     }
 
     /** {@inheritDoc} */
-    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean
affReady)
+    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut,
+        boolean affReady,
+        boolean updateMoving)
         throws IgniteCheckedException {
         ClusterNode loc = ctx.localNode();
 
@@ -538,8 +540,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
                     consistencyCheck();
 
-                    if (affReady && oldest != null && oldest.isLocal())
+                    if (updateMoving) {
+                        assert grp.affinity().lastVersion().equals(evts.topologyVersion());
+
                         createMovingPartitions(grp.affinity().readyAffinity(evts.topologyVersion()));
+                    }
 
                     if (log.isDebugEnabled()) {
                         log.debug("Partition map after beforeExchange [exchId=" + exchFut.exchangeId()
+
@@ -1272,7 +1277,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
             node2part = partMap;
 
-            if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer)
>= 0) {
+            if (exchangeResVer == null && (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer)
>= 0)) {
                 AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer);
 
                 for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
@@ -1622,7 +1627,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
         }
     }
 
-    public void createMovingPartitions(AffinityAssignment aff) {
+    private void createMovingPartitions(AffinityAssignment aff) {
         for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
             GridDhtPartitionMap map = e.getValue();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/07947429/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 406b2b9..575161d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -38,6 +38,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -58,6 +59,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
@@ -710,7 +712,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     if (grp.isLocal())
                         continue;
 
-                    grp.topology().beforeExchange(this, !centralizedAff);
+                    grp.topology().beforeExchange(this, !centralizedAff, false);
                 }
             }
         }
@@ -985,7 +987,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
                 // It is possible affinity is not initialized yet if node joins to cluster.
                 if (grp.affinity().lastVersion().topologyVersion() > 0)
-                    grp.topology().beforeExchange(this, !centralizedAff);
+                    grp.topology().beforeExchange(this, !centralizedAff, false);
             }
         }
 
@@ -2110,7 +2112,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (!crd.equals(discoCache.serverNodes().get(0)) && !exchCtx.mergeExchanges())
{
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     if (!grp.isLocal())
-                        grp.topology().beforeExchange(this, !centralizedAff);
+                        grp.topology().beforeExchange(this, !centralizedAff, false);
                 }
             }
 
@@ -2165,11 +2167,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 else
                     cctx.affinity().mergeExchangesOnServerJoin(this, true);
 
-                for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                    if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
+                for (CacheGroupDescriptor desc : cctx.affinity().cacheGroups()) {
+                    if (desc.config().getCacheMode() == CacheMode.LOCAL)
                         continue;
 
-                    grp.topology().beforeExchange(this, true);
+                    CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
+
+                    GridDhtPartitionTopology top = grp != null ? grp.topology() :
+                        cctx.exchange().clientTopology(desc.groupId(), this);
+
+                    top.beforeExchange(this, true, true);
                 }
             }
 
@@ -2200,38 +2207,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         resTopVer,
                         affReq,
                         joinedNodeAff);
-
-                    UUID nodeId = e.getKey();
-
-                    // If node requested affinity on join and partitions are not created,
then
-                    // all affinity partitions should be in MOVING state.
-                    for (Integer grpId : affReq) {
-                        GridDhtPartitionMap partMap = msg.partitions().get(grpId);
-
-                        if (partMap == null || F.isEmpty(partMap.map())) {
-                            CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
-
-                            GridDhtPartitionTopology top = grp != null ? grp.topology() :
-                                cctx.exchange().clientTopology(grpId, this);
-
-                            if (partMap == null) {
-                                partMap = new GridDhtPartitionMap(nodeId,
-                                    1L,
-                                    resTopVer,
-                                    new GridPartitionStateMap(),
-                                    false);
-                            }
-
-                            AffinityAssignment aff = cctx.affinity().affinity(grpId).cachedAffinity(resTopVer);
-
-                            for (int p = 0; p < aff.assignment().size(); p++) {
-                                if (aff.getIds(p).contains(nodeId))
-                                    partMap.put(p, GridDhtPartitionState.MOVING);
-                            }
-
-                            top.update(exchId, partMap, true);
-                        }
-                    }
                 }
             }
 
@@ -2679,7 +2654,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
                             continue;
 
-                        grp.topology().beforeExchange(this, true);
+                        grp.topology().beforeExchange(this, true, false);
                     }
                 }
             }


Mime
View raw message