ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: 5578
Date Wed, 02 Aug 2017 20:01:48 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 48330d36c -> 72d3c3ad2


5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/72d3c3ad
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/72d3c3ad
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/72d3c3ad

Branch: refs/heads/ignite-5578
Commit: 72d3c3ad25559eff986bd48d62f916d40fa75a48
Parents: 48330d3
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Aug 2 22:06:24 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Aug 2 23:01:35 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       | 25 +++---
 .../processors/cache/ExchangeContext.java       | 16 ++--
 .../cache/ExchangeDiscoveryEvents.java          | 84 +++++++++++++++-----
 .../GridCachePartitionExchangeManager.java      |  7 +-
 .../dht/GridClientPartitionTopology.java        | 44 +++++-----
 .../GridDhtPartitionsExchangeFuture.java        | 26 +++---
 .../distributed/CacheExchangeMergeTest.java     |  2 +-
 7 files changed, 126 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/72d3c3ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index a785477..5b30017 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1677,7 +1677,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                             aff.initialize(topVer, assign);
                         }
 
-                        grpHolder.topology(fut).beforeExchange(fut, true, false);
+                        grpHolder.topology().beforeExchange(fut, true, false);
                     }
                     else {
                         List<GridDhtPartitionsExchangeFuture> exchFuts = cctx.exchange().exchangeFutures();
@@ -1738,7 +1738,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                             aff.initialize(topVer, assign);
                         }
 
-                        grpHolder.topology(fut).beforeExchange(fut, true, false);
+                        grpHolder.topology().beforeExchange(fut, true, false);
                     }
                 }
 
@@ -1828,7 +1828,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             return null;
         }
         else {
-            final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(evts.waitRebalanceEventVersion());
+            final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(evts.lastServerEventVersion());
 
             forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>()
{
                 @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException
{
@@ -1856,7 +1856,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         Map<UUID, GridDhtPartitionMap> map = affinityFullMap(aff);
 
                         for (GridDhtPartitionMap map0 : map.values())
-                            cache.topology(fut).update(fut.exchangeId(), map0, true);
+                            cache.topology().update(fut.exchangeId(), map0, true);
                     }
                 }
             });
@@ -2060,7 +2060,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         final IgniteClosure<ClusterNode, T> c,
         final boolean initAff)
         throws IgniteCheckedException {
-        final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(fut.context().events().waitRebalanceEventVersion());
+        final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(fut.context().events().lastServerEventVersion());
 
         final Collection<ClusterNode> aliveNodes = fut.context().events().discoveryCache().serverNodes();
 
@@ -2085,7 +2085,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 List<List<ClusterNode>> newAssignment0 = initAff ? new ArrayList<>(newAssignment)
: null;
 
-                GridDhtPartitionTopology top = grpHolder.topology(fut);
+                GridDhtPartitionTopology top = grpHolder.topology();
 
                 Map<Integer, List<T>> cacheAssignment = null;
 
@@ -2194,8 +2194,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     /**
      * @return All registered cache groups.
      */
-    public Collection<CacheGroupDescriptor> cacheGroups() {
-        return caches.allGroups();
+    public Map<Integer, CacheGroupDescriptor> cacheGroups() {
+        return caches.registeredGrps;
     }
 
     /**
@@ -2293,10 +2293,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         /**
-         * @param fut Exchange future.
          * @return Cache topology.
          */
-        abstract GridDhtPartitionTopology topology(GridDhtPartitionsExchangeFuture fut);
+        abstract GridDhtPartitionTopology topology();
 
         /**
          * @return Affinity.
@@ -2331,7 +2330,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         /** {@inheritDoc} */
-        @Override public GridDhtPartitionTopology topology(GridDhtPartitionsExchangeFuture
fut) {
+        @Override public GridDhtPartitionTopology topology() {
             return grp.topology();
         }
     }
@@ -2407,8 +2406,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         }
 
         /** {@inheritDoc} */
-        @Override public GridDhtPartitionTopology topology(GridDhtPartitionsExchangeFuture
fut) {
-            return cctx.exchange().clientTopology(groupId(), fut);
+        @Override public GridDhtPartitionTopology topology() {
+            return cctx.exchange().clientTopology(groupId());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/72d3c3ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
index ab0ff1e..94040ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java
@@ -19,12 +19,13 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.HashSet;
 import java.util.Set;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager.exchangeProtocolVersion;
 
@@ -35,20 +36,20 @@ public class ExchangeContext {
     /** */
     public static final String IGNITE_EXCHANGE_COMPATIBILITY_VER_1 = "IGNITE_EXCHANGE_COMPATIBILITY_VER_1";
 
-    /** */
+    /** Cache groups to request affinity for during local join exchange. */
     private Set<Integer> requestGrpsAffOnJoin;
 
-    /** */
+    /** Per-group affinity fetch on join (old protocol). */
     private boolean fetchAffOnJoin;
 
-    /** */
+    /** Merges allowed flag. */
     private final boolean merge;
 
     /** */
     private final ExchangeDiscoveryEvents evts;
 
     /** */
-    private final boolean compatibilityNode = IgniteSystemProperties.getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_VER_1,
false);
+    private final boolean compatibilityNode = getBoolean(IGNITE_EXCHANGE_COMPATIBILITY_VER_1,
false);
 
     /**
      * @param crd Coordinator flag.
@@ -122,4 +123,9 @@ public class ExchangeContext {
     public boolean mergeExchanges() {
         return merge;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ExchangeContext.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/72d3c3ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
index 5be869f..ff61701 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeDiscoveryEvents.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -38,42 +39,43 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 
 /**
- *
+ * Discovery events processed in single exchange (contain multiple events if exchanges for
multiple
+ * discovery events are merged into single exchange).
  */
 public class ExchangeDiscoveryEvents {
-    /** */
+    /** Last event version. */
     private AffinityTopologyVersion topVer;
 
-    /** */
+    /** Last server join/fail event version. */
     private AffinityTopologyVersion srvEvtTopVer;
 
-    /** */
+    /** Discovery data cache for last event. */
     private DiscoCache discoCache;
 
-    /** */
+    /** Last event. */
     private DiscoveryEvent lastEvt;
 
-    /** */
+    /** Last server join/fail event. */
     private DiscoveryEvent lastSrvEvt;
 
-    /** */
+    /** All events. */
     private List<DiscoveryEvent> evts = new ArrayList<>();
 
-    /** */
+    /** Server join flag. */
     private boolean srvJoin;
 
-    /** */
+    /** Sever left flag. */
     private boolean srvLeft;
 
     /**
-     * @param fut Future.
+     * @param fut Current exchange future.
      */
     ExchangeDiscoveryEvents(GridDhtPartitionsExchangeFuture fut) {
         addEvent(fut.initialVersion(), fut.discoveryEvent(), fut.discoCache());
     }
 
     /**
-     * @param fut Current future.
+     * @param fut Current exchange future.
      */
     public void processEvents(GridDhtPartitionsExchangeFuture fut) {
         for (DiscoveryEvent evt : evts) {
@@ -85,8 +87,14 @@ public class ExchangeDiscoveryEvents {
             warnNoAffinityNodes(fut.sharedContext());
     }
 
+    /**
+     * @param nodeId Node ID.
+     * @return {@code True} if has join event for give node.
+     */
     public boolean nodeJoined(UUID nodeId) {
-        for (DiscoveryEvent evt : evts) {
+        for (int i = 0; i < evts.size(); i++) {
+            DiscoveryEvent evt = evts.get(i);
+
             if (evt.type() == EVT_NODE_JOINED && nodeId.equals(evt.eventNode().id()))
                 return true;
         }
@@ -94,11 +102,23 @@ public class ExchangeDiscoveryEvents {
         return false;
     }
 
-    public AffinityTopologyVersion waitRebalanceEventVersion() {
-        return srvEvtTopVer != null ? srvEvtTopVer : topVer;
+    /**
+     * @return Last server join/fail event version.
+     */
+    AffinityTopologyVersion lastServerEventVersion() {
+        assert srvEvtTopVer != null;
+
+        return srvEvtTopVer;
     }
 
+    /**
+     * @param topVer Event version.
+     * @param evt Event.
+     * @param cache Discovery data cache for given topology version.
+     */
     void addEvent(AffinityTopologyVersion topVer, DiscoveryEvent evt, DiscoCache cache) {
+        assert evts.isEmpty() || topVer.compareTo(this.topVer) > 0 : topVer;
+
         evts.add(evt);
 
         this.topVer = topVer;
@@ -124,39 +144,60 @@ public class ExchangeDiscoveryEvents {
         }
     }
 
+    /**
+     * @return All events.
+     */
     public List<DiscoveryEvent> events() {
         return evts;
     }
 
-    public boolean serverLeftEvent(DiscoveryEvent evt) {
+    /**
+     * @param evt Event.
+     * @return {@code True} if given event is {@link EventType#EVT_NODE_FAILED} or {@link
EventType#EVT_NODE_LEFT}.
+     */
+    public static boolean serverLeftEvent(DiscoveryEvent evt) {
         return  ((evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT) &&
!CU.clientNode(evt.eventNode()));
     }
 
-
+    /**
+     * @return Discovery data cache for last event.
+     */
     public DiscoCache discoveryCache() {
         return discoCache;
     }
 
-    public DiscoveryEvent lastEvent() {
+    /**
+     * @return Last event.
+     */
+    DiscoveryEvent lastEvent() {
         return lastSrvEvt != null ? lastSrvEvt : lastEvt;
     }
 
+    /**
+     * @return Last event version.
+     */
     public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
+    /**
+     * @return {@code True} if has event for server join.
+     */
     public boolean hasServerJoin() {
         return srvJoin;
     }
 
+    /**
+     * @return {@code True} if has event for server leave.
+     */
     public boolean hasServerLeft() {
         return srvLeft;
     }
 
     /**
-     *
+     * @param cctx Context.
      */
-    public void warnNoAffinityNodes(GridCacheSharedContext cctx) {
+    public void warnNoAffinityNodes(GridCacheSharedContext<?, ?> cctx) {
         List<String> cachesWithoutNodes = null;
 
         for (DynamicCacheDescriptor cacheDesc : cctx.cache().cacheDescriptors().values())
{
@@ -213,4 +254,9 @@ public class ExchangeDiscoveryEvents {
             U.quietAndWarn(log, "Must have server nodes for caches to operate.");
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ExchangeDiscoveryEvents.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/72d3c3ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index fb91a6d..c8235ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -707,10 +707,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
     /**
      * @param grpId Cache group ID.
-     * @param exchFut Exchange future.
      * @return Topology.
      */
-    public GridDhtPartitionTopology clientTopology(int grpId, GridDhtPartitionsExchangeFuture
exchFut) {
+    public GridDhtPartitionTopology clientTopology(int grpId) {
         GridClientPartitionTopology top = clientTops.get(grpId);
 
         if (top != null)
@@ -718,7 +717,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         Object affKey = null;
 
-        CacheGroupDescriptor grpDesc = cctx.cache().cacheGroupDescriptors().get(grpId);
+        CacheGroupDescriptor grpDesc = cctx.affinity().cacheGroups().get(grpId);
 
         if (grpDesc != null) {
             CacheConfiguration<?, ?> ccfg = grpDesc.config();
@@ -732,7 +731,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         }
 
         GridClientPartitionTopology old = clientTops.putIfAbsent(grpId,
-            top = new GridClientPartitionTopology(cctx, grpId, exchFut, affKey));
+            top = new GridClientPartitionTopology(cctx, grpId, affKey));
 
         return old != null ? old : top;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/72d3c3ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index c821382..f4b8971 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -117,33 +117,24 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
     /**
      * @param cctx Context.
      * @param grpId Group ID.
-     * @param exchFut Exchange ID.
      * @param similarAffKey Key to find caches with similar affinity.
      */
     public GridClientPartitionTopology(
-        GridCacheSharedContext cctx,
+        GridCacheSharedContext<?, ?> cctx,
         int grpId,
-        GridDhtPartitionsExchangeFuture exchFut,
         Object similarAffKey
     ) {
         this.cctx = cctx;
         this.grpId = grpId;
         this.similarAffKey = similarAffKey;
 
-        topVer = exchFut.initialVersion();
-
-        discoCache = exchFut.discoCache();
+        topVer = AffinityTopologyVersion.NONE;
 
         log = cctx.logger(getClass());
 
-        lock.writeLock().lock();
-
-        try {
-            beforeExchange0(cctx.localNode(), exchFut);
-        }
-        finally {
-            lock.writeLock().unlock();
-        }
+        node2part = new GridDhtPartitionFullMap(cctx.localNode().id(),
+            cctx.localNode().order(),
+            updateSeq.get());
     }
 
     /**
@@ -198,7 +189,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
         try {
             AffinityTopologyVersion exchTopVer = exchFut.initialVersion();
 
-            assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [topVer="
+ topVer +
+            assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [grp="
+ grpId +
+                ", topVer=" + topVer +
                 ", exchVer=" + exchTopVer + ']';
 
             this.stopping = stopping;
@@ -283,6 +275,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
         }
     }
 
+    /**
+     * @param aff Affinity.
+     */
     private void createMovingPartitions(AffinityAssignment aff) {
         for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
             GridDhtPartitionMap map = e.getValue();
@@ -292,6 +287,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
         }
     }
 
+    /**
+     * @param map Node partition state map.
+     * @param parts Partitions assigned to node.
+     */
     private void addMoving(GridDhtPartitionMap map, Set<Integer> parts) {
         if (F.isEmpty(parts))
             return;
@@ -311,11 +310,12 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
     private void beforeExchange0(ClusterNode loc, GridDhtPartitionsExchangeFuture exchFut)
{
         GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 
-        assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer="
+
-            topVer + ", exchId=" + exchId + ']';
-
-        if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent())
-            removeNode(exchId.nodeId());
+        if (exchFut.context().events().hasServerLeft()) {
+            for (DiscoveryEvent evt : exchFut.context().events().events()) {
+                if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
+                    removeNode(evt.eventNode().id());
+            }
+        }
 
         // In case if node joins, get topology at the time of joining node.
         ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
@@ -980,12 +980,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
         assert nodeId != null;
         assert lock.writeLock().isHeldByCurrentThread();
 
-        ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache();
-
         ClusterNode loc = cctx.localNode();
 
         if (node2part != null) {
-            if (oldest.equals(loc) && !node2part.nodeId().equals(loc.id())) {
+            if (!node2part.nodeId().equals(loc.id())) {
                 updateSeq.setIfGreater(node2part.updateSequence());
 
                 node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq.incrementAndGet(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/72d3c3ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index df72d30..6f77f96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -121,6 +121,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
     /** */
     @GridToStringExclude
+    private final Object mux = new Object();
+
+    /** */
+    @GridToStringExclude
     private volatile DiscoCache discoCache;
 
     /** Discovery event. */
@@ -130,10 +134,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private final Set<UUID> remaining = new HashSet<>();
 
-    /** */
-    @GridToStringExclude
-    private final Object mux = new Object();
-
     /** Guarded by this */
     @GridToStringExclude
     private int pendingSingleUpdates;
@@ -174,6 +174,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** Last committed cache version before next topology version use. */
     private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
 
+    /** */
+    @GridToStringExclude
+    private GridDhtPartitionsSingleMessage pendingJoinMsg;
+
     /**
      * Messages received on non-coordinator are stored in case if this node
      * becomes coordinator.
@@ -259,10 +263,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private GridDhtPartitionsExchangeFuture mergedWith;
 
-    /** */
-    @GridToStringExclude
-    private GridDhtPartitionsSingleMessage pendingJoinMsg;
-
 
     /**
      * @param cctx Cache context.
@@ -2179,14 +2179,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 else
                     cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, true);
 
-                for (CacheGroupDescriptor desc : cctx.affinity().cacheGroups()) {
+                for (CacheGroupDescriptor desc : cctx.affinity().cacheGroups().values())
{
                     if (desc.config().getCacheMode() == CacheMode.LOCAL)
                         continue;
 
                     CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
 
                     GridDhtPartitionTopology top = grp != null ? grp.topology() :
-                        cctx.exchange().clientTopology(desc.groupId(), this);
+                        cctx.exchange().clientTopology(desc.groupId());
 
                     top.beforeExchange(this, true, true);
                 }
@@ -2204,7 +2204,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
                     GridDhtPartitionTopology top = grp != null ? grp.topology() :
-                        cctx.exchange().clientTopology(grpId, this);
+                        cctx.exchange().clientTopology(grpId);
 
                     Map<Integer, T2<Long, Long>> cntrs = msg.partitionUpdateCounters(grpId);
 
@@ -2721,7 +2721,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
                 if (oldest != null && oldest.isLocal()) {
-                    cctx.exchange().clientTopology(grpId, this).update(resTopVer,
+                    cctx.exchange().clientTopology(grpId).update(resTopVer,
                         entry.getValue(),
                         cntrMap,
                         Collections.<Integer>emptySet(),
@@ -2745,7 +2745,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
             GridDhtPartitionTopology top = grp != null ? grp.topology() :
-                cctx.exchange().clientTopology(grpId, this);
+                cctx.exchange().clientTopology(grpId);
 
             top.update(exchId, entry.getValue(), false);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/72d3c3ad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 20371be..61b8b44 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -475,7 +475,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest {
 
                 return null;
             }
-        }, 1, "start-srv");
+        }, 1, "start-client");
 
         fut1.get();
         fut2.get();


Mime
View raw message