ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [04/18] ignite git commit: ignite-5872 Fixed backward compatibility
Date Fri, 25 Aug 2017 08:39:46 GMT
ignite-5872 Fixed backward compatibility

(cherry picked from commit cca9117)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/129be29e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/129be29e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/129be29e

Branch: refs/heads/ignite-6149
Commit: 129be29e96cfaba3bb7645c66981de62646f820c
Parents: fa42218
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Aug 21 18:39:12 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Aug 23 12:22:15 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 47 ++++++++---
 .../dht/GridClientPartitionTopology.java        |  9 +++
 .../dht/GridDhtPartitionTopology.java           |  5 ++
 .../dht/GridDhtPartitionTopologyImpl.java       |  5 ++
 .../CachePartitionFullCountersMap.java          | 36 +++++++++
 .../CachePartitionPartialCountersMap.java       | 23 ++++++
 .../GridDhtPartitionsExchangeFuture.java        | 41 +++++++---
 .../preloader/GridDhtPartitionsFullMessage.java | 84 +++++++++++++++++---
 .../GridDhtPartitionsSingleMessage.java         | 23 ++++--
 .../IgniteDhtPartitionCountersMap.java          | 14 ++--
 .../IgniteDhtPartitionCountersMap2.java         | 69 ++++++++++++++++
 11 files changed, 315 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 200f677..984721b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -65,6 +65,8 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCach
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -973,7 +975,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     private void sendAllPartitions(Collection<ClusterNode> nodes,
         AffinityTopologyVersion msgTopVer) {
-        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, null, null, null,
null);
+        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, false, null, null,
null, null);
 
         m.topologyVersion(msgTopVer);
 
@@ -1000,6 +1002,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /**
      * @param compress {@code True} if possible to compress message (properly work only if
prepareMarshall/
      *     finishUnmarshall methods are called).
+     * @param newCntrMap {@code True} if possible to use {@link CachePartitionFullCountersMap}.
      * @param exchId Non-null exchange ID if message is created for exchange.
      * @param lastVer Last version.
      * @param partHistSuppliers Partition history suppliers map.
@@ -1008,6 +1011,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     public GridDhtPartitionsFullMessage createPartitionsFullMessage(
         boolean compress,
+        boolean newCntrMap,
         @Nullable final GridDhtPartitionExchangeId exchId,
         @Nullable GridCacheVersion lastVer,
         @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@@ -1046,8 +1050,16 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                         affCache.similarAffinityKey());
                 }
 
-                if (exchId != null)
-                    m.addPartitionUpdateCounters(grp.groupId(), grp.topology().fullUpdateCounters());
+                if (exchId != null) {
+                    CachePartitionFullCountersMap cntrsMap = grp.topology().fullUpdateCounters();
+
+                    if (newCntrMap)
+                        m.addPartitionUpdateCounters(grp.groupId(), cntrsMap);
+                    else {
+                        m.addPartitionUpdateCounters(grp.groupId(),
+                            CachePartitionFullCountersMap.toCountersMap(cntrsMap));
+                    }
+                }
             }
         }
 
@@ -1064,8 +1076,14 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                     top.similarAffinityKey());
             }
 
-            if (exchId != null)
-                m.addPartitionUpdateCounters(top.groupId(), top.fullUpdateCounters());
+            if (exchId != null) {
+                CachePartitionFullCountersMap cntrsMap = top.fullUpdateCounters();
+
+                if (newCntrMap)
+                    m.addPartitionUpdateCounters(top.groupId(), cntrsMap);
+                else
+                    m.addPartitionUpdateCounters(top.groupId(), CachePartitionFullCountersMap.toCountersMap(cntrsMap));
+            }
         }
 
         return m;
@@ -1119,6 +1137,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id,
             cctx.kernalContext().clientNode(),
             false,
+            false,
             null);
 
         if (log.isDebugEnabled())
@@ -1141,12 +1160,14 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
      * @param exchangeId Exchange ID.
      * @param clientOnlyExchange Client exchange flag.
      * @param sndCounters {@code True} if need send partition update counters.
+     * @param newCntrMap {@code True} if possible to use {@link CachePartitionPartialCountersMap}.
      * @return Message.
      */
     public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(
         @Nullable GridDhtPartitionExchangeId exchangeId,
         boolean clientOnlyExchange,
         boolean sndCounters,
+        boolean newCntrMap,
         ExchangeActions exchActions
     ) {
         GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId,
@@ -1167,8 +1188,12 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                     locMap,
                     grp.affinity().similarAffinityKey());
 
-                if (sndCounters)
-                    m.partitionUpdateCounters(grp.groupId(), grp.topology().localUpdateCounters(true));
+                if (sndCounters) {
+                    CachePartitionPartialCountersMap cntrsMap = grp.topology().localUpdateCounters(true);
+
+                    m.addPartitionUpdateCounters(grp.groupId(),
+                        newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+                }
             }
         }
 
@@ -1185,8 +1210,12 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                 locMap,
                 top.similarAffinityKey());
 
-            if (sndCounters)
-                m.partitionUpdateCounters(top.groupId(), top.localUpdateCounters(true));
+            if (sndCounters) {
+                CachePartitionPartialCountersMap cntrsMap = top.localUpdateCounters(true);
+
+                m.addPartitionUpdateCounters(top.groupId(),
+                    newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+            }
         }
 
         return m;

http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 77792c7..c8856fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -115,6 +115,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
     /** */
     private volatile DiscoCache discoCache;
 
+    /** */
+    private final int parts;
+
     /**
      * @param cctx Context.
      * @param grpId Group ID.
@@ -130,6 +133,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
         this.cctx = cctx;
         this.grpId = grpId;
         this.similarAffKey = similarAffKey;
+        this.parts = parts;
 
         topVer = AffinityTopologyVersion.NONE;
 
@@ -142,6 +146,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
         cntrMap = new CachePartitionFullCountersMap(parts);
     }
 
+    /** {@inheritDoc} */
+    @Override public int partitions() {
+        return parts;
+    }
+
     /**
      * @return Key to find caches with similar affinity.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 22205ea..4ae68ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -43,6 +43,11 @@ import org.jetbrains.annotations.Nullable;
 @GridToStringExclude
 public interface GridDhtPartitionTopology {
     /**
+     * @return  Total cache partitions.
+     */
+    public int partitions();
+
+    /**
      * Locks the topology, usually during mapping on locks or transactions.
      */
     public void readLock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 16fe012..f25ae21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -161,6 +161,11 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
     }
 
     /** {@inheritDoc} */
+    @Override public int partitions() {
+        return grp.affinityFunction().partitions();
+    }
+
+    /** {@inheritDoc} */
     @Override public int groupId() {
         return grp.groupId();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
index 1384a55..ebc993c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionFullCountersMap.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  *
@@ -96,4 +99,37 @@ public class CachePartitionFullCountersMap implements Serializable {
         Arrays.fill(initialUpdCntrs, 0);
         Arrays.fill(updCntrs, 0);
     }
+
+    /**
+     * @param map Full counters map.
+     * @return Regular java map with counters.
+     */
+    public static Map<Integer, T2<Long, Long>> toCountersMap(CachePartitionFullCountersMap
map) {
+        int partsCnt = map.updCntrs.length;
+
+        Map<Integer, T2<Long, Long>> map0 = U.newHashMap(partsCnt);
+
+        for (int p = 0; p < partsCnt; p++)
+            map0.put(p, new T2<>(map.initialUpdCntrs[p], map.updCntrs[p]));
+
+        return map0;
+    }
+
+    /**
+     * @param map Regular java map with counters.
+     * @param partsCnt Total cache partitions.
+     * @return Full counters map.
+     */
+    static CachePartitionFullCountersMap fromCountersMap(Map<Integer, T2<Long, Long>>
map, int partsCnt) {
+        CachePartitionFullCountersMap map0 = new CachePartitionFullCountersMap(partsCnt);
+
+        for (Map.Entry<Integer, T2<Long, Long>> e : map.entrySet()) {
+            T2<Long, Long> cntrs = e.getValue();
+
+            map0.initialUpdCntrs[e.getKey()] = cntrs.get1();
+            map0.updCntrs[e.getKey()] = cntrs.get2();
+        }
+
+        return map0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
index 851ffed..83c0231 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CachePartitionPartialCountersMap.java
@@ -21,8 +21,10 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
+import java.util.TreeMap;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
 
 /**
  *
@@ -32,6 +34,9 @@ public class CachePartitionPartialCountersMap implements Serializable {
     private static final long serialVersionUID = 0L;
 
     /** */
+    static final IgniteProductVersion PARTIAL_COUNTERS_MAP_SINCE = IgniteProductVersion.fromString("2.1.4");
+
+    /** */
     public static final CachePartitionPartialCountersMap EMPTY = new CachePartitionPartialCountersMap();
 
     /** */
@@ -158,4 +163,22 @@ public class CachePartitionPartialCountersMap implements Serializable
{
 
         return res;
     }
+
+    /**
+     * @param map Partition ID to partition counters map.
+     * @param partsCnt Total cache partitions.
+     * @return Partial local counters map.
+     */
+    static CachePartitionPartialCountersMap fromCountersMap(Map<Integer, T2<Long, Long>>
map, int partsCnt) {
+        CachePartitionPartialCountersMap map0 = new CachePartitionPartialCountersMap(partsCnt);
+
+        TreeMap<Integer, T2<Long, Long>> sorted = new TreeMap<>(map);
+
+        for (Map.Entry<Integer, T2<Long, Long>> e : sorted.entrySet())
+            map0.add(e.getKey(), e.getValue().get1(), e.getValue().get2());
+
+        map0.trim();
+
+        return map0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index ceb5abc..8e0deb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -92,6 +92,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -106,6 +107,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE;
 
 /**
  * Future for exchanging partition maps.
@@ -1231,6 +1233,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(),
                 false,
                 true,
+                node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >=
0,
                 exchActions);
 
             Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
@@ -1258,13 +1261,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
     /**
      * @param compress Message compress flag.
+     * @param newCntrMap {@code True} if possible to use {@link CachePartitionFullCountersMap}.
      * @return Message.
      */
-    private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress) {
+    private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress,
+        boolean newCntrMap) {
         GridCacheVersion last = lastVer.get();
 
         GridDhtPartitionsFullMessage m = cctx.exchange().createPartitionsFullMessage(
             compress,
+            newCntrMap,
             exchangeId(),
             last != null ? last : cctx.versions().last(),
             partHistSuppliers,
@@ -1797,9 +1803,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 if (finishState0 == null) {
                     assert firstDiscoEvt.type() == EVT_NODE_JOINED && CU.clientNode(firstDiscoEvt.eventNode())
: this;
 
+                    ClusterNode node = cctx.node(nodeId);
+
+                    if (node == null)
+                        return;
+
                     finishState0 = new FinishState(cctx.localNodeId(),
                         initialVersion(),
-                        createPartitionsMessage(true));
+                        createPartitionsMessage(true, node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE)
>= 0));
                 }
 
                 sendAllPartitionsToNode(finishState0, msg, nodeId);
@@ -1937,7 +1948,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
 
-            GridDhtPartitionsFullMessage m = createPartitionsMessage(false);
+            GridDhtPartitionsFullMessage m = createPartitionsMessage(false, false);
 
             CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
 
@@ -1959,7 +1970,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         Map<Integer, Long> minCntrs = new HashMap<>();
 
         for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
-            CachePartitionPartialCountersMap nodeCntrs = e.getValue().partitionUpdateCounters(top.groupId());
+            CachePartitionPartialCountersMap nodeCntrs = e.getValue().partitionUpdateCounters(top.groupId(),
+                top.partitions());
 
             assert nodeCntrs != null;
 
@@ -2235,7 +2247,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     GridDhtPartitionTopology top = grp != null ? grp.topology() :
                         cctx.exchange().clientTopology(grpId);
 
-                    CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId);
+                    CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId,
+                        top.partitions());
 
                     if (cntrs != null)
                         top.collectUpdateCounters(cntrs);
@@ -2283,7 +2296,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
             cctx.versions().onExchange(lastVer.get().order());
 
-            GridDhtPartitionsFullMessage msg = createPartitionsMessage(true);
+            IgniteProductVersion minVer = exchCtx.events().discoveryCache().minimumNodeVersion();
+
+            GridDhtPartitionsFullMessage msg = createPartitionsMessage(true,
+                minVer.compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0);
 
             if (exchCtx.mergeExchanges()) {
                 assert !centralizedAff;
@@ -2571,6 +2587,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     msg.restoreExchangeId(),
                     cctx.kernalContext().clientNode(),
                     true,
+                    node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >=
0,
                     exchActions);
 
                 if (localJoinExchange() && finishState0 == null)
@@ -2745,11 +2762,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet())
{
             Integer grpId = entry.getKey();
 
-            CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId);
-
             CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
             if (grp != null) {
+                CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId,
+                    grp.topology().partitions());
+
                 grp.topology().update(resTopVer,
                     entry.getValue(),
                     cntrMap,
@@ -2760,7 +2778,12 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
                 if (oldest != null && oldest.isLocal()) {
-                    cctx.exchange().clientTopology(grpId).update(resTopVer,
+                    GridDhtPartitionTopology top = cctx.exchange().clientTopology(grpId);
+
+                    CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId,
+                        top.partitions());
+
+                    top.update(resTopVer,
                         entry.getValue(),
                         cntrMap,
                         Collections.<Integer>emptySet(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 2bb19cd..edbfc23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -68,6 +69,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /** Serialized partitions counters. */
     private byte[] partCntrsBytes;
 
+    /** Partitions update counters. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private IgniteDhtPartitionCountersMap2 partCntrs2;
+
+    /** Serialized partitions counters. */
+    private byte[] partCntrsBytes2;
+
     /** Partitions history suppliers. */
     @GridToStringInclude
     @GridDirectTransient
@@ -149,6 +158,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         cp.partsBytes = partsBytes;
         cp.partCntrs = partCntrs;
         cp.partCntrsBytes = partCntrsBytes;
+        cp.partCntrs2 = partCntrs2;
+        cp.partCntrsBytes2 = partCntrsBytes2;
         cp.partHistSuppliers = partHistSuppliers;
         cp.partHistSuppliersBytes = partHistSuppliersBytes;
         cp.partsToReload = partsToReload;
@@ -275,7 +286,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
      * @param grpId Cache group ID.
      * @param cntrMap Partition update counters.
      */
-    public void addPartitionUpdateCounters(int grpId, CachePartitionFullCountersMap cntrMap)
{
+    public void addPartitionUpdateCounters(int grpId, Map<Integer, T2<Long, Long>>
cntrMap) {
         if (partCntrs == null)
             partCntrs = new IgniteDhtPartitionCountersMap();
 
@@ -284,10 +295,30 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /**
      * @param grpId Cache group ID.
+     * @param cntrMap Partition update counters.
+     */
+    public void addPartitionUpdateCounters(int grpId, CachePartitionFullCountersMap cntrMap)
{
+        if (partCntrs2 == null)
+            partCntrs2 = new IgniteDhtPartitionCountersMap2();
+
+        partCntrs2.putIfAbsent(grpId, cntrMap);
+    }
+
+    /**
+     * @param grpId Cache group ID.
+     * @param partsCnt Total cache partitions.
      * @return Partition update counters.
      */
-    public CachePartitionFullCountersMap partitionUpdateCounters(int grpId) {
-        return partCntrs == null ? null : partCntrs.get(grpId);
+    public CachePartitionFullCountersMap partitionUpdateCounters(int grpId, int partsCnt)
{
+        if (partCntrs2 != null)
+            return partCntrs2.get(grpId);
+
+        if (partCntrs == null)
+            return null;
+
+        Map<Integer, T2<Long, Long>> map = partCntrs.get(grpId);
+
+        return map != null ? CachePartitionFullCountersMap.fromCountersMap(map, partsCnt)
: null;
     }
 
     /**
@@ -327,6 +358,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         boolean marshal = (!F.isEmpty(parts) && partsBytes == null) ||
             (partCntrs != null && !partCntrs.empty() && partCntrsBytes ==
null) ||
+            (partCntrs2 != null && !partCntrs2.empty() && partCntrsBytes2
== null) ||
             (partHistSuppliers != null && partHistSuppliersBytes == null) ||
             (partsToReload != null && partsToReloadBytes == null) ||
             (!F.isEmpty(errs) && errsBytes == null);
@@ -334,6 +366,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         if (marshal) {
             byte[] partsBytes0 = null;
             byte[] partCntrsBytes0 = null;
+            byte[] partCntrsBytes20 = null;
             byte[] partHistSuppliersBytes0 = null;
             byte[] partsToReloadBytes0 = null;
             byte[] errsBytes0 = null;
@@ -344,6 +377,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             if (partCntrs != null && !partCntrs.empty() && partCntrsBytes
== null)
                 partCntrsBytes0 = U.marshal(ctx, partCntrs);
 
+            if (partCntrs2 != null && !partCntrs2.empty() && partCntrsBytes2
== null)
+                partCntrsBytes20 = U.marshal(ctx, partCntrs2);
+
             if (partHistSuppliers != null && partHistSuppliersBytes == null)
                 partHistSuppliersBytes0 = U.marshal(ctx, partHistSuppliers);
 
@@ -359,12 +395,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 try {
                     byte[] partsBytesZip = U.zip(partsBytes0);
                     byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
+                    byte[] partCntrsBytes2Zip = U.zip(partCntrsBytes20);
                     byte[] partHistSuppliersBytesZip = U.zip(partHistSuppliersBytes0);
                     byte[] partsToReloadBytesZip = U.zip(partsToReloadBytes0);
                     byte[] exsBytesZip = U.zip(errsBytes0);
 
                     partsBytes0 = partsBytesZip;
                     partCntrsBytes0 = partCntrsBytesZip;
+                    partCntrsBytes20 = partCntrsBytes2Zip;
                     partHistSuppliersBytes0 = partHistSuppliersBytesZip;
                     partsToReloadBytes0 = partsToReloadBytesZip;
                     errsBytes0 = exsBytesZip;
@@ -378,6 +416,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
             partsBytes = partsBytes0;
             partCntrsBytes = partCntrsBytes0;
+            partCntrsBytes2 = partCntrsBytes20;
             partHistSuppliersBytes = partHistSuppliersBytes0;
             partsToReloadBytes = partsToReloadBytes0;
             errsBytes = errsBytes0;
@@ -446,6 +485,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
         }
 
+        if (partCntrsBytes2 != null && partCntrs2 == null) {
+            if (compressed())
+                partCntrs2 = U.unmarshalZip(ctx.marshaller(), partCntrsBytes2, U.resolveClassLoader(ldr,
ctx.gridConfig()));
+            else
+                partCntrs2 = U.unmarshal(ctx, partCntrsBytes2, U.resolveClassLoader(ldr,
ctx.gridConfig()));
+        }
+
         if (partHistSuppliersBytes != null && partHistSuppliers == null) {
             if (compressed())
                 partHistSuppliers = U.unmarshalZip(ctx.marshaller(), partHistSuppliersBytes,
U.resolveClassLoader(ldr, ctx.gridConfig()));
@@ -520,30 +566,36 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
+                if (!writer.writeByteArray("partCntrsBytes2", partCntrsBytes2))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeByteArray("partsBytes", partsBytes))
+                if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
+                if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeMessage("resTopVer", resTopVer))
+                if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
                     return false;
 
                 writer.incrementState();
 
             case 14:
+                if (!writer.writeMessage("resTopVer", resTopVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 15:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -606,7 +658,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 10:
-                partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
+                partCntrsBytes2 = reader.readByteArray("partCntrsBytes2");
 
                 if (!reader.isLastRead())
                     return false;
@@ -614,7 +666,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 11:
-                partsBytes = reader.readByteArray("partsBytes");
+                partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -622,7 +674,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 12:
-                partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
+                partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -630,7 +682,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 13:
-                resTopVer = reader.readMessage("resTopVer");
+                partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -638,6 +690,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 14:
+                resTopVer = reader.readMessage("resTopVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 15:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -657,7 +717,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 15;
+        return 16;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 44815ca..215152d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -61,7 +62,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /** Partitions update counters. */
     @GridToStringInclude
     @GridDirectTransient
-    private Map<Integer, CachePartitionPartialCountersMap> partCntrs;
+    private Map<Integer, Object> partCntrs;
 
     /** Serialized partitions counters. */
     private byte[] partCntrsBytes;
@@ -189,7 +190,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
      * @param grpId Cache group ID.
      * @param cntrMap Partition update counters.
      */
-    public void partitionUpdateCounters(int grpId, CachePartitionPartialCountersMap cntrMap)
{
+    public void addPartitionUpdateCounters(int grpId, Object cntrMap) {
         if (partCntrs == null)
             partCntrs = new HashMap<>();
 
@@ -198,12 +199,24 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /**
      * @param grpId Cache group ID.
+     * @param partsCnt Total cache partitions.
      * @return Partition update counters.
      */
-    public CachePartitionPartialCountersMap partitionUpdateCounters(int grpId) {
-        CachePartitionPartialCountersMap res = partCntrs == null ? null : partCntrs.get(grpId);
+    @SuppressWarnings("unchecked")
+    public CachePartitionPartialCountersMap partitionUpdateCounters(int grpId, int partsCnt)
{
+        Object res = partCntrs == null ? null : partCntrs.get(grpId);
 
-        return res == null ? CachePartitionPartialCountersMap.EMPTY : res;
+        if (res == null)
+            return CachePartitionPartialCountersMap.EMPTY;
+
+        if (res instanceof CachePartitionPartialCountersMap)
+            return (CachePartitionPartialCountersMap)res;
+
+        assert res instanceof Map : res;
+
+        Map<Integer, T2<Long, Long>> map = (Map<Integer, T2<Long, Long>>)res;
+
+        return CachePartitionPartialCountersMap.fromCountersMap(map, partsCnt);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
index e7954d9..dc2fbf8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
@@ -19,8 +19,10 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.ignite.internal.util.typedef.T2;
 
 /**
  * Partition counters map.
@@ -30,7 +32,7 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
     private static final long serialVersionUID = 0L;
 
     /** */
-    private Map<Integer, CachePartitionFullCountersMap> map;
+    private Map<Integer, Map<Integer, T2<Long, Long>>> map;
 
     /**
      * @return {@code True} if map is empty.
@@ -43,7 +45,7 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
      * @param cacheId Cache ID.
      * @param cntrMap Counters map.
      */
-    public synchronized void putIfAbsent(int cacheId, CachePartitionFullCountersMap cntrMap)
{
+    public synchronized void putIfAbsent(int cacheId, Map<Integer, T2<Long, Long>>
cntrMap) {
         if (map == null)
             map = new HashMap<>();
 
@@ -55,14 +57,14 @@ public class IgniteDhtPartitionCountersMap implements Serializable {
      * @param cacheId Cache ID.
      * @return Counters map.
      */
-    public synchronized CachePartitionFullCountersMap get(int cacheId) {
+    public synchronized Map<Integer, T2<Long, Long>> get(int cacheId) {
         if (map == null)
-            return null;
+            map = new HashMap<>();
 
-        CachePartitionFullCountersMap cntrMap = map.get(cacheId);
+        Map<Integer, T2<Long, Long>> cntrMap = map.get(cacheId);
 
         if (cntrMap == null)
-            return null;
+            return Collections.emptyMap();
 
         return cntrMap;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/129be29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap2.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap2.java
new file mode 100644
index 0000000..d1e6d99
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap2.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Partition counters map.
+ */
+public class IgniteDhtPartitionCountersMap2 implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private Map<Integer, CachePartitionFullCountersMap> map;
+
+    /**
+     * @return {@code True} if map is empty.
+     */
+    public synchronized boolean empty() {
+        return map == null || map.isEmpty();
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param cntrMap Counters map.
+     */
+    public synchronized void putIfAbsent(int cacheId, CachePartitionFullCountersMap cntrMap)
{
+        if (map == null)
+            map = new HashMap<>();
+
+        if (!map.containsKey(cacheId))
+            map.put(cacheId, cntrMap);
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @return Counters map.
+     */
+    public synchronized CachePartitionFullCountersMap get(int cacheId) {
+        if (map == null)
+            return null;
+
+        CachePartitionFullCountersMap cntrMap = map.get(cacheId);
+
+        if (cntrMap == null)
+            return null;
+
+        return cntrMap;
+    }
+}


Mime
View raw message