ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4154
Date Mon, 07 Nov 2016 06:08:37 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4154-2 [created] e291914ca


ignite-4154


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e291914c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e291914c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e291914c

Branch: refs/heads/ignite-4154-2
Commit: e291914ca72762fd4e39867f2d1569f4706e4627
Parents: 6ac5317
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Nov 4 14:31:11 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Nov 4 17:03:31 2016 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 137 ++++++++++++++-----
 .../dht/GridDhtPartitionTopologyImpl.java       |  11 +-
 .../dht/preloader/GridDhtPartitionFullMap.java  |  14 ++
 .../dht/preloader/GridDhtPartitionMap2.java     |  43 +++---
 .../GridDhtPartitionsExchangeFuture.java        |  85 +++++-------
 .../preloader/GridDhtPartitionsFullMessage.java |  82 ++++++++++-
 .../GridDhtPartitionsSingleMessage.java         |  62 ++++++++-
 7 files changed, 315 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e291914c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index a81bf0f..9097934 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridListSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -81,6 +82,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -762,62 +764,129 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
      * @param nodes Nodes.
      * @return {@code True} if message was sent, {@code false} if node left grid.
      */
-    private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) {
-        GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE);
+    private boolean sendAllPartitions(Collection<ClusterNode> nodes) {
+        GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, null, null, true);
 
-        boolean useOldApi = false;
-        boolean compress = true;
+        if (log.isDebugEnabled())
+            log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" +
m + ']');
 
         for (ClusterNode node : nodes) {
-            if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
-                useOldApi = true;
-                compress = false;
+            try {
+                cctx.io().sendNoRetry(node, m, SYSTEM_POOL);
+            }
+            catch (ClusterTopologyCheckedException ignore) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send partition update to node because it left grid
(will ignore) [node=" +
+                        node.id() + ", msg=" + m + ']');
+            }
+            catch (IgniteCheckedException e) {
+                U.warn(log, "Failed to send partitions full message [node=" + node + ", err="
+ e + ']');
+            }
+        }
 
-                break;
+        return true;
+    }
+    /**
+     * @param nodes Target nodes.
+     * @return Message;
+     */
+    public GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode>
nodes,
+        GridDhtPartitionExchangeId exchId,
+        GridCacheVersion lastVer,
+        boolean compress) {
+        GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
+                lastVer,
+                exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
+
+        boolean useOldApi = false;
+
+        if (nodes != null) {
+            for (ClusterNode node : nodes) {
+                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
+                    useOldApi = true;
+                    compress = false;
+
+                    break;
+                }
+                else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE)
< 0)
+                    compress = false;
             }
-            else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE)
< 0)
-                compress = false;
         }
 
         m.compress(compress);
 
+        Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
+
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal() && cacheCtx.started()) {
-                GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
-
-                if (useOldApi) {
-                    locMap = new GridDhtPartitionFullMap(locMap.nodeId(),
-                        locMap.nodeOrder(),
-                        locMap.updateSequence(),
-                        locMap);
+            if (!cacheCtx.isLocal()) {
+                boolean ready;
+
+                if (exchId != null) {
+                    AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
+
+                    ready = startTopVer == null || startTopVer.compareTo(exchId.topologyVersion())
<= 0;
                 }
+                else
+                    ready = cacheCtx.started();
+
+                if (ready) {
+                    GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
+
+                    if (useOldApi)
+                        locMap = new GridDhtPartitionFullMap(locMap.nodeId(), locMap.nodeOrder(),
locMap.updateSequence(), locMap);
 
-                m.addFullPartitionsMap(cacheCtx.cacheId(), locMap);
+                    addFullPartitionsMap(m,
+                        dupData,
+                        compress,
+                        cacheCtx.cacheId(),
+                        locMap,
+                        cacheCtx.affinity().affinityCache().similarAffinityKey());
+
+                    if (exchId != null)
+                        m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
+                }
             }
         }
 
         // It is important that client topologies be added after contexts.
-        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
+        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
             m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true));
 
-        if (log.isDebugEnabled())
-            log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" +
m + ']');
+            if (exchId != null)
+                m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters());
+        }
 
-        for (ClusterNode node : nodes) {
-            try {
-                cctx.io().sendNoRetry(node, m, SYSTEM_POOL);
-            }
-            catch (ClusterTopologyCheckedException ignore) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send partition update to node because it left grid
(will ignore) [node=" +
-                        node.id() + ", msg=" + m + ']');
-            }
-            catch (IgniteCheckedException e) {
-                U.warn(log, "Failed to send partitions full message [node=" + node + ", err="
+ e + ']');
+        return m;
+    }
+
+    private void addFullPartitionsMap(GridDhtPartitionsFullMessage m,
+        Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData,
+        boolean compress,
+        Integer cacheId,
+        GridDhtPartitionFullMap locMap,
+        Object affKey) {
+        Integer dupDataCache = null;
+
+        if (compress) {
+            T2<Integer, GridDhtPartitionFullMap> state0 = dupData.get(affKey);
+
+            if (state0 != null && state0.get2().partitionStateEquals(locMap)) {
+                GridDhtPartitionFullMap locMap0 = new GridDhtPartitionFullMap(locMap.nodeId(),
+                    locMap.nodeOrder(),
+                    locMap.updateSequence());
+
+                for (Map.Entry<UUID, GridDhtPartitionMap2> e : locMap.entrySet())
+                    locMap0.put(e.getKey(), e.getValue().emptyCopy());
+
+                locMap = locMap0;
+
+                dupDataCache = state0.get1();
             }
+            else
+                dupData.put(affKey, new T2<>(cacheId, locMap));
         }
 
-        return true;
+        m.addFullPartitionsMap(cacheId, locMap, dupDataCache);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e291914c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 871a084..71458fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -71,6 +71,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** Flag to control amount of output for full map. */
     private static final boolean FULL_MAP_DEBUG = false;
 
+    /** */
+    private static final Long ZERO = 0L;
+
     /** Context. */
     private final GridCacheContext<?, ?> cctx;
 
@@ -1029,7 +1032,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                 for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
                     Long cntr = this.cntrMap.get(e.getKey());
 
-                    if (cntr == null || cntr < e.getValue())
+                    if ((cntr == null || cntr < e.getValue()) && !e.getValue().equals(ZERO))
                         this.cntrMap.put(e.getKey(), e.getValue());
                 }
 
@@ -1169,7 +1172,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                 for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
                     Long cntr = this.cntrMap.get(e.getKey());
 
-                    if (cntr == null || cntr < e.getValue())
+                    if ((cntr == null || cntr < e.getValue()) && !e.getValue().equals(ZERO))
                         this.cntrMap.put(e.getKey(), e.getValue());
                 }
 
@@ -1513,9 +1516,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                     continue;
 
                 Long cntr0 = res.get(part.id());
-                Long cntr1 = part.updateCounter();
+                long cntr1 = part.updateCounter();
 
-                if (cntr0 == null || cntr1 > cntr0)
+                if ((cntr0 == null || cntr1 > cntr0) && cntr1 != 0)
                     res.put(part.id(), cntr1);
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e291914c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
index 498d492..4253cc2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
@@ -157,6 +157,20 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
         return updateSeq;
     }
 
+    public boolean partitionStateEquals(GridDhtPartitionFullMap fullMap) {
+        if (size() != fullMap.size())
+            return false;
+
+        for (Map.Entry<UUID, GridDhtPartitionMap2> e : entrySet()) {
+            GridDhtPartitionMap2 m = fullMap.get(e.getKey());
+
+            if (m == null || !m.map().equals(e.getValue().map()))
+                return false;
+        }
+
+        return true;
+    }
+
     /**
      * @param updateSeq New update sequence value.
      * @return Old update sequence value.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e291914c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
index 15b5a2e..5cdafa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
@@ -63,25 +63,15 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>,
E
     /**
      * @param nodeId Node ID.
      * @param updateSeq Update sequence number.
-     */
-    public GridDhtPartitionMap2(UUID nodeId, long updateSeq) {
-        assert nodeId != null;
-        assert updateSeq > 0;
-
-        this.nodeId = nodeId;
-        this.updateSeq = updateSeq;
-
-        map = new HashMap<>();
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param updateSeq Update sequence number.
+     * @param top Topology version.
      * @param m Map to copy.
      * @param onlyActive If {@code true}, then only active states will be included.
      */
-    public GridDhtPartitionMap2(UUID nodeId, long updateSeq, AffinityTopologyVersion top,
-        Map<Integer, GridDhtPartitionState> m, boolean onlyActive) {
+    public GridDhtPartitionMap2(UUID nodeId,
+        long updateSeq,
+        AffinityTopologyVersion top,
+        Map<Integer, GridDhtPartitionState> m,
+        boolean onlyActive) {
         assert nodeId != null;
         assert updateSeq > 0;
 
@@ -99,6 +89,20 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>,
E
         }
     }
 
+    private GridDhtPartitionMap2(UUID nodeId, long updateSeq, AffinityTopologyVersion top,
Map<Integer, GridDhtPartitionState> map, int moving) {
+        this.nodeId = nodeId;
+        this.updateSeq = updateSeq;
+        this.top = top;
+        this.map = map;
+        this.moving = moving;
+    }
+
+    public GridDhtPartitionMap2 emptyCopy() {
+        Map<Integer, GridDhtPartitionState> map = new HashMap<>();
+
+        return new GridDhtPartitionMap2(nodeId, updateSeq, top, map, moving);
+    }
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -174,6 +178,13 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>,
E
     }
 
     /**
+     * @param map Partition states map.
+     */
+    public void map(Map<Integer, GridDhtPartitionState> map) {
+        this.map = map;
+    }
+
+    /**
      * @return Node ID.
      */
     public UUID nodeId() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e291914c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 6a17583..52ed262 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -64,6 +65,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -933,10 +935,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      */
     private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId
id)
         throws IgniteCheckedException {
+        boolean compress =
+            node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE)
>= 0;
+
         GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
             clientOnlyExchange,
             cctx.versions().last(),
-            node.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE)
>= 0);
+            compress);
+
+        Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData
= new HashMap<>();
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal()) {
@@ -945,7 +952,23 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
                     locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(),
locMap.map());
 
-                m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);
+                Integer dupDataCache = null;
+
+                if (compress) {
+                    Object affKey = cacheCtx.affinity().affinityCache().similarAffinityKey();
+
+                    T2<Integer, Map<Integer, GridDhtPartitionState>> state0 =
dupData.get(affKey);
+
+                    if (state0 != null && state0.get2().equals(locMap.map())) {
+                        dupDataCache = state0.get1();
+
+                        locMap.map(Collections.<Integer, GridDhtPartitionState>emptyMap());
+                    }
+                    else
+                        dupData.put(affKey, new T2<>(cacheCtx.cacheId(), locMap.map()));
+                }
+
+                m.addLocalPartitionMap(cacheCtx.cacheId(), locMap, dupDataCache);
 
                 m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
             }
@@ -967,58 +990,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @param nodes Target nodes.
      * @return Message;
      */
-    private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode>
nodes) {
+    private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode>
nodes, boolean compress) {
         GridCacheVersion last = lastVer.get();
 
-        GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchangeId(),
-            last != null ? last : cctx.versions().last(),
-            topologyVersion());
-
-        boolean useOldApi = false;
-        boolean compress = true;
-
-        if (nodes != null) {
-            for (ClusterNode node : nodes) {
-                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
-                    useOldApi = true;
-                    compress = false;
-
-                    break;
-                }
-                else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE)
< 0)
-                    compress = false;
-            }
-        }
-
-        m.compress(compress);
-
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal()) {
-                AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
-
-                boolean ready = startTopVer == null || startTopVer.compareTo(topologyVersion())
<= 0;
-
-                if (ready) {
-                    GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
-
-                    if (useOldApi)
-                        locMap = new GridDhtPartitionFullMap(locMap.nodeId(), locMap.nodeOrder(),
locMap.updateSequence(), locMap);
-
-                    m.addFullPartitionsMap(cacheCtx.cacheId(), locMap);
-
-                    m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
-                }
-            }
-        }
-
-        // It is important that client topologies be added after contexts.
-        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
-            m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true));
-
-            m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters());
-        }
-
-        return m;
+        return cctx.exchange().createPartitionsMessage(nodes,
+            exchangeId(), last != null ? last : cctx.versions().last(),
+            compress);
     }
 
     /**
@@ -1026,7 +1003,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException
{
-        GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes);
+        GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, true);
 
         if (log.isDebugEnabled())
             log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id())
+
@@ -1244,7 +1221,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
 
-            GridDhtPartitionsFullMessage m = createPartitionsMessage(null);
+            GridDhtPartitionsFullMessage m = createPartitionsMessage(null, false);
 
             CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e291914c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 63d63e2..e5a2828 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -22,14 +22,20 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
+
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
@@ -48,6 +54,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     private Map<Integer, GridDhtPartitionFullMap> parts;
 
     /** */
+    @GridDirectMap(keyType = Integer.class, valueType = Integer.class)
+    private Map<Integer, Integer> dupPartsData;
+
+    /** */
     private byte[] partsBytes;
 
     /** Partitions update counters. */
@@ -63,7 +73,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /** */
     @GridDirectTransient
-    private boolean compress;
+    private transient boolean compress;
 
     /**
      * Required by {@link Externalizable}.
@@ -103,11 +113,29 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
      * @param fullMap Full partitions map.
      */
     public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap) {
+        addFullPartitionsMap(cacheId, fullMap, null);
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param fullMap Full partitions map.
+     */
+    public void addFullPartitionsMap(int cacheId, GridDhtPartitionFullMap fullMap, Integer
dupDataCache) {
         if (parts == null)
             parts = new HashMap<>();
 
-        if (!parts.containsKey(cacheId))
+        if (!parts.containsKey(cacheId)) {
             parts.put(cacheId, fullMap);
+
+            if (dupDataCache != null) {
+                assert parts.containsKey(dupDataCache);
+
+                if (dupPartsData == null)
+                    dupPartsData = new HashMap<>();
+
+                dupPartsData.put(cacheId, dupDataCache);
+            }
+        }
     }
 
     /**
@@ -197,6 +225,32 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 parts = U.unmarshalZip(ctx.marshaller(), partsBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
             else
                 parts = U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
+            if (dupPartsData != null) {
+                assert parts != null;
+
+                for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) {
+                    GridDhtPartitionFullMap map1 = parts.get(e.getKey());
+
+                    assert map1 != null : e.getKey();
+
+                    GridDhtPartitionFullMap map2 = parts.get(e.getValue());
+
+                    assert map2 != null : e.getValue();
+
+                    for (Map.Entry<UUID, GridDhtPartitionMap2> e0 : map1.entrySet())
{
+                        GridDhtPartitionMap2 partMap1 = e0.getValue();
+
+                        assert partMap1.map().isEmpty();
+
+                        GridDhtPartitionMap2 partMap2 = map1.get(e0.getKey());
+
+                        assert partMap2 != null;
+
+                        partMap1.map(new HashMap<>(partMap2.map()));
+                    }
+                }
+            }
         }
 
         if (parts == null)
@@ -229,18 +283,24 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         switch (writer.state()) {
             case 6:
-                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT,
MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("partsBytes", partsBytes))
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 8:
+                if (!writer.writeByteArray("partsBytes", partsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -263,7 +323,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         switch (reader.state()) {
             case 6:
-                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT,
MessageCollectionItemType.INT, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -271,7 +331,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 7:
-                partsBytes = reader.readByteArray("partsBytes");
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -279,6 +339,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 8:
+                partsBytes = reader.readByteArray("partsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -298,7 +366,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 9;
+        return 10;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e291914c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index a37e092..134a3b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -23,12 +23,16 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
@@ -45,6 +49,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     @GridDirectTransient
     private Map<Integer, GridDhtPartitionMap2> parts;
 
+    /** */
+    @GridDirectMap(keyType = Integer.class, valueType = Integer.class)
+    private Map<Integer, Integer> dupPartsData;
+
     /** Serialized partitions. */
     private byte[] partsBytes;
 
@@ -60,7 +68,8 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     private boolean client;
 
     /** */
-    private boolean compress;
+    @GridDirectTransient
+    private transient boolean compress;
 
     /**
      * Required by {@link Externalizable}.
@@ -104,6 +113,19 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
         parts.put(cacheId, locMap);
     }
 
+    public void addLocalPartitionMap(int cacheId, GridDhtPartitionMap2 locMap, Integer dupDataCache)
{
+        addLocalPartitionMap(cacheId, locMap);
+
+        if (dupDataCache != null) {
+            assert F.isEmpty(locMap.map());
+
+            if (dupPartsData == null)
+                dupPartsData = new HashMap<>();
+
+            dupPartsData.put(cacheId, dupDataCache);
+        }
+    }
+
     /**
      * @param cacheId Cache ID.
      * @param cntrMap Partition update counters.
@@ -192,6 +214,24 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
             else
                 partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
         }
+
+        if (dupPartsData != null) {
+            assert parts != null;
+
+            for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) {
+                GridDhtPartitionMap2 map1 = parts.get(e.getKey());
+
+                assert map1 != null : e.getKey();
+                assert F.isEmpty(map1.map());
+
+                GridDhtPartitionMap2 map2 = parts.get(e.getValue());
+
+                assert map2 != null : e.getValue();
+                assert map2.map() != null;
+
+                map1.map(new HashMap<>(map2.map()));
+            }
+        }
     }
 
     /** {@inheritDoc} */
@@ -216,12 +256,18 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT,
MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 8:
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -252,7 +298,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 7:
-                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+                dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT,
MessageCollectionItemType.INT, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -260,6 +306,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 8:
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -279,7 +333,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 9;
+        return 10;
     }
 
     /** {@inheritDoc} */


Mime
View raw message