ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [15/17] ignite git commit: ignite-4154 Optimize amount of data stored in discovery history Discovery history optimizations: - remove discarded message for discovery pending messages - remove duplicated data from TcpDiscoveryNodeAddedMessage.oldNodesDisco
Date Fri, 11 Nov 2016 14:19:04 GMT
ignite-4154 Optimize amount of data stored in discovery history
Discovery history optimizations:
- remove discarded message for discovery pending messages
- remove duplicated data from TcpDiscoveryNodeAddedMessage.oldNodesDiscoData
- do not store unnecessary data in discovery EnsuredMessageHistory
- use special property for EnsuredMessageHistory size instead of IGNITE_DISCOVERY_HISTORY_SIZE
Affinity history optimizations:
- do not store calculated primary/backup maps in history
- try save the same assignments instance for caches with similar affinity
Exchange messages optimizations:
- do not send duplicated partition state maps for caches with similar affinity
- use zip compression for data sent in exchange messages


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7128a395
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7128a395
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7128a395

Branch: refs/heads/ignite-4154-opt2
Commit: 7128a395085b60e86436f807b4bdbca83627d41a
Parents: 8bb8bdd
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Nov 11 15:29:38 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Nov 11 15:29:38 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   4 +
 .../processors/affinity/AffinityAssignment.java |  88 +++++
 .../affinity/GridAffinityAssignment.java        |   8 +-
 .../affinity/GridAffinityAssignmentCache.java   |  35 +-
 .../affinity/GridAffinityProcessor.java         |  89 ++++-
 .../processors/affinity/GridAffinityUtils.java  |   8 +-
 .../affinity/HistoryAffinityAssignment.java     | 169 ++++++++
 .../cache/CacheAffinitySharedManager.java       |  57 ++-
 .../cache/DynamicCacheChangeBatch.java          |   7 +
 .../cache/GridCacheAffinityManager.java         |   6 +-
 .../GridCachePartitionExchangeManager.java      | 284 ++++++++++++--
 .../processors/cache/GridCacheProcessor.java    |   5 +-
 .../dht/GridClientPartitionTopology.java        |  33 +-
 .../dht/GridDhtPartitionTopology.java           |   3 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  31 +-
 .../dht/preloader/GridDhtPartitionFullMap.java  |  18 +
 .../dht/preloader/GridDhtPartitionMap2.java     |  53 ++-
 .../GridDhtPartitionsAbstractMessage.java       |  40 +-
 .../GridDhtPartitionsExchangeFuture.java        |  84 +---
 .../preloader/GridDhtPartitionsFullMessage.java | 150 ++++++-
 .../GridDhtPartitionsSingleMessage.java         | 132 ++++++-
 .../GridDhtPartitionsSingleRequest.java         |   4 +-
 .../dht/preloader/GridDhtPreloader.java         |   4 +-
 .../continuous/GridContinuousProcessor.java     |   4 +-
 .../ignite/internal/util/IgniteUtils.java       |  64 +++
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  26 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 234 +++++++++--
 .../TcpDiscoveryNodeAddFinishedMessage.java     |  11 +
 .../messages/TcpDiscoveryNodeAddedMessage.java  |  33 +-
 ...CacheExchangeMessageDuplicatedStateTest.java | 393 +++++++++++++++++++
 .../cache/IgniteCachePeekModesAbstractTest.java |   2 +-
 .../distributed/IgniteCacheGetRestartTest.java  |   3 +
 ...cingDelayedPartitionMapExchangeSelfTest.java |   8 +-
 .../GridCacheRebalancingSyncSelfTest.java       |  18 +-
 .../GridCacheSyncReplicatedPreloadSelfTest.java |   3 -
 .../IgniteCacheSyncRebalanceModeSelfTest.java   |   4 +-
 ...ContinuousQueryFailoverAbstractSelfTest.java |   2 +-
 .../IgniteNoCustomEventsOnNodeStart.java        |   7 +
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 134 ++++++-
 .../junits/common/GridCommonAbstractTest.java   |  25 +-
 .../testsuites/IgniteCacheTestSuite2.java       |   3 +
 .../cache/IgniteCacheOffheapEvictQueryTest.java |   7 +
 ...lientQueryReplicatedNodeRestartSelfTest.java |   7 +
 ...butedQueryStopOnCancelOrTimeoutSelfTest.java |   7 +
 .../query/h2/sql/GridQueryParsingTest.java      |  11 +-
 .../src/test/config/incorrect-store-cache.xml   |   2 +
 .../src/test/config/jdbc-pojo-store-builtin.xml |   3 +
 .../src/test/config/jdbc-pojo-store-obj.xml     |   3 +
 modules/spring/src/test/config/node.xml         |   2 +
 modules/spring/src/test/config/node1.xml        |   2 +
 .../test/config/pojo-incorrect-store-cache.xml  |   2 +
 modules/spring/src/test/config/store-cache.xml  |   2 +
 modules/spring/src/test/config/store-cache1.xml |   2 +
 53 files changed, 2061 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index ab6403f..a75027b 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -383,6 +383,10 @@ public final class IgniteSystemProperties {
     /** Maximum size for discovery messages history. */
     public static final String IGNITE_DISCOVERY_HISTORY_SIZE = "IGNITE_DISCOVERY_HISTORY_SIZE";
 
+    /** Maximum number of discovery message history used to support client reconnect. */
+    public static final String IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE =
+        "IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE";
+
     /** Number of cache operation retries in case of topology exceptions. */
     public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
new file mode 100644
index 0000000..06207d3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityAssignment.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cluster.ClusterNode;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Cached affinity calculations.
+ */
+public interface AffinityAssignment {
+    /**
+     * @return {@code True} if related discovery event did not not cause affinity assignment change and
+     *    this assignment is just reference to the previous one.
+     */
+    public boolean clientEventChange();
+
+    /**
+     * @return Affinity assignment computed by affinity function.
+     */
+    public List<List<ClusterNode>> idealAssignment();
+
+    /**
+     * @return Affinity assignment.
+     */
+    public List<List<ClusterNode>> assignment();
+
+    /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion topologyVersion();
+
+    /**
+     * Get affinity nodes for partition.
+     *
+     * @param part Partition.
+     * @return Affinity nodes.
+     */
+    public List<ClusterNode> get(int part);
+
+    /**
+     * Get affinity node IDs for partition.
+     *
+     * @param part Partition.
+     * @return Affinity nodes IDs.
+     */
+    public HashSet<UUID> getIds(int part);
+
+    /**
+     * @return Nodes having primary partitions assignments.
+     */
+    public Set<ClusterNode> primaryPartitionNodes();
+
+    /**
+     * Get primary partitions for specified node ID.
+     *
+     * @param nodeId Node ID to get primary partitions for.
+     * @return Primary partitions for specified node ID.
+     */
+    public Set<Integer> primaryPartitions(UUID nodeId);
+
+    /**
+     * Get backup partitions for specified node ID.
+     *
+     * @param nodeId Node ID to get backup partitions for.
+     * @return Backup partitions for specified node ID.
+     */
+    public Set<Integer> backupPartitions(UUID nodeId);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index 568e4e8..35130a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 /**
  * Cached affinity calculations.
  */
-public class GridAffinityAssignment implements Serializable {
+public class GridAffinityAssignment implements AffinityAssignment, Serializable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -86,7 +86,7 @@ public class GridAffinityAssignment implements Serializable {
 
         this.topVer = topVer;
         this.assignment = assignment;
-        this.idealAssignment = idealAssignment;
+        this.idealAssignment = idealAssignment.equals(assignment) ? assignment : idealAssignment;
 
         primary = new HashMap<>();
         backup = new HashMap<>();
@@ -274,10 +274,10 @@ public class GridAffinityAssignment implements Serializable {
         if (o == this)
             return true;
 
-        if (o == null || getClass() != o.getClass())
+        if (o == null || !(o instanceof AffinityAssignment))
             return false;
 
-        return topVer.equals(((GridAffinityAssignment)o).topVer);
+        return topVer.equals(((AffinityAssignment)o).topologyVersion());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a81b34d..a388c7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -78,7 +78,7 @@ public class GridAffinityAssignmentCache {
     private final int partsCnt;
 
     /** Affinity calculation results cache: topology version => partition => nodes. */
-    private final ConcurrentNavigableMap<AffinityTopologyVersion, GridAffinityAssignment> affCache;
+    private final ConcurrentNavigableMap<AffinityTopologyVersion, HistoryAffinityAssignment> affCache;
 
     /** */
     private List<List<ClusterNode>> idealAssignment;
@@ -107,6 +107,9 @@ public class GridAffinityAssignmentCache {
     /** Full history size. */
     private final AtomicInteger fullHistSize = new AtomicInteger();
 
+    /** */
+    private final Object similarAffKey;
+
     /**
      * Constructs affinity cached calculations.
      *
@@ -127,6 +130,7 @@ public class GridAffinityAssignmentCache {
     {
         assert ctx != null;
         assert aff != null;
+        assert nodeFilter != null;
 
         this.ctx = ctx;
         this.aff = aff;
@@ -142,6 +146,17 @@ public class GridAffinityAssignmentCache {
         partsCnt = aff.partitions();
         affCache = new ConcurrentSkipListMap<>();
         head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
+
+        similarAffKey = ctx.affinity().similaryAffinityKey(aff, nodeFilter, backups, partsCnt);
+
+        assert similarAffKey != null;
+    }
+
+    /**
+     * @return Key to find caches with similar affinity.
+     */
+    public Object similarAffinityKey() {
+        return similarAffKey;
     }
 
     /**
@@ -170,7 +185,7 @@ public class GridAffinityAssignmentCache {
 
         GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment);
 
-        affCache.put(topVer, assignment);
+        affCache.put(topVer, new HistoryAffinityAssignment(assignment));
         head.set(assignment);
 
         for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
@@ -300,7 +315,7 @@ public class GridAffinityAssignmentCache {
 
         GridAffinityAssignment assignmentCpy = new GridAffinityAssignment(topVer, aff);
 
-        affCache.put(topVer, assignmentCpy);
+        affCache.put(topVer, new HistoryAffinityAssignment(assignmentCpy));
         head.set(assignmentCpy);
 
         for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
@@ -328,7 +343,7 @@ public class GridAffinityAssignmentCache {
      * @return Affinity assignment.
      */
     public List<List<ClusterNode>> assignments(AffinityTopologyVersion topVer) {
-        GridAffinityAssignment aff = cachedAffinity(topVer);
+        AffinityAssignment aff = cachedAffinity(topVer);
 
         return aff.assignment();
     }
@@ -427,7 +442,7 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version.
      * @return Cached affinity.
      */
-    public GridAffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) {
+    public AffinityAssignment cachedAffinity(AffinityTopologyVersion topVer) {
         if (topVer.equals(AffinityTopologyVersion.NONE))
             topVer = lastVersion();
         else
@@ -435,7 +450,7 @@ public class GridAffinityAssignmentCache {
 
         assert topVer.topologyVersion() >= 0 : topVer;
 
-        GridAffinityAssignment cache = head.get();
+        AffinityAssignment cache = head.get();
 
         if (!cache.topologyVersion().equals(topVer)) {
             cache = affCache.get(topVer);
@@ -463,7 +478,7 @@ public class GridAffinityAssignmentCache {
      * @return {@code True} if primary changed or required affinity version not found in history.
      */
     public boolean primaryChanged(int part, AffinityTopologyVersion startVer, AffinityTopologyVersion endVer) {
-        GridAffinityAssignment aff = affCache.get(startVer);
+        AffinityAssignment aff = affCache.get(startVer);
 
         if (aff == null)
             return false;
@@ -475,7 +490,7 @@ public class GridAffinityAssignmentCache {
 
         ClusterNode primary = nodes.get(0);
 
-        for (GridAffinityAssignment assignment : affCache.tailMap(startVer, false).values()) {
+        for (AffinityAssignment assignment : affCache.tailMap(startVer, false).values()) {
             List<ClusterNode> nodes0 = assignment.assignment().get(part);
 
             if (nodes0.isEmpty())
@@ -549,10 +564,10 @@ public class GridAffinityAssignmentCache {
         }
 
         if (rmvCnt > 0) {
-            Iterator<GridAffinityAssignment> it = affCache.values().iterator();
+            Iterator<HistoryAffinityAssignment> it = affCache.values().iterator();
 
             while (it.hasNext() && rmvCnt > 0) {
-                GridAffinityAssignment aff0 = it.next();
+                AffinityAssignment aff0 = it.next();
 
                 it.remove();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 1726d02..b9182ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -385,10 +386,16 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
             }
 
             try {
+                AffinityAssignment assign0 = cctx.affinity().assignment(topVer);
+
+                GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ?
+                    (GridAffinityAssignment)assign0 :
+                    new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment());
+
                 AffinityInfo info = new AffinityInfo(
                     cctx.config().getAffinity(),
                     cctx.config().getAffinityMapper(),
-                    new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer)),
+                    assign,
                     cctx.cacheObjectContext());
 
                 IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(info));
@@ -562,6 +569,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
         return nodes.iterator().next();
     }
 
+    /**
+     * @param aff Affinity function.
+     * @param nodeFilter Node class.
+     * @param backups Number of backups.
+     * @param parts Number of partitions.
+     * @return Key to find caches with similar affinity.
+     */
+    public Object similaryAffinityKey(AffinityFunction aff,
+        IgnitePredicate<ClusterNode> nodeFilter,
+        int backups,
+        int parts) {
+        return new SimilarAffinityKey(aff.getClass(), nodeFilter.getClass(), backups, parts);
+    }
+
     /** {@inheritDoc} */
     @Override public void printMemoryStats() {
         X.println(">>>");
@@ -960,4 +981,70 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
             return aff;
         }
     }
+
+    /**
+     *
+     */
+    private static class SimilarAffinityKey {
+        /** */
+        private final int backups;
+
+        /** */
+        private final Class<?> affFuncCls;
+
+        /** */
+        private final Class<?> filterCls;
+
+        /** */
+        private final int partsCnt;
+
+        /** */
+        private final int hash;
+
+        /**
+         * @param affFuncCls Affinity function class.
+         * @param filterCls Node filter class.
+         * @param backups Number of backups.
+         * @param partsCnt Number of partitions.
+         */
+        SimilarAffinityKey(Class<?> affFuncCls, Class<?> filterCls, int backups, int partsCnt) {
+            this.backups = backups;
+            this.affFuncCls = affFuncCls;
+            this.filterCls = filterCls;
+            this.partsCnt = partsCnt;
+
+            int hash = backups;
+            hash = 31 * hash + affFuncCls.hashCode();
+            hash = 31 * hash + filterCls.hashCode();
+            hash= 31 * hash + partsCnt;
+
+            this.hash = hash;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return hash;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (o == this)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            SimilarAffinityKey key = (SimilarAffinityKey)o;
+
+            return backups == key.backups &&
+                affFuncCls == key.affFuncCls &&
+                filterCls == key.filterCls &&
+                partsCnt == key.partsCnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SimilarAffinityKey.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
index c24dd2d..abd5292 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
@@ -180,10 +180,16 @@ class GridAffinityUtils {
 
             cctx.affinity().affinityReadyFuture(topVer).get();
 
+            AffinityAssignment assign0 = cctx.affinity().assignment(topVer);
+
+            GridAffinityAssignment assign = assign0 instanceof GridAffinityAssignment ?
+                (GridAffinityAssignment)assign0 :
+                new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment());
+
             return F.t(
                 affinityMessage(ctx, cctx.config().getAffinity()),
                 affinityMessage(ctx, cctx.config().getAffinityMapper()),
-                new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer)));
+                assign);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
new file mode 100644
index 0000000..e502dd5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/HistoryAffinityAssignment.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ *
+ */
+public class HistoryAffinityAssignment implements AffinityAssignment {
+    /** */
+    private final AffinityTopologyVersion topVer;
+
+    /** */
+    private final List<List<ClusterNode>> assignment;
+
+    /** */
+    private final List<List<ClusterNode>> idealAssignment;
+
+    /** */
+    private final boolean clientEvtChange;
+
+    /**
+     * @param assign Assignment.
+     */
+    public HistoryAffinityAssignment(GridAffinityAssignment assign) {
+        this.topVer = assign.topologyVersion();
+        this.assignment = assign.assignment();
+        this.idealAssignment = assign.idealAssignment();
+        this.clientEvtChange = assign.clientEventChange();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean clientEventChange() {
+        return clientEvtChange;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> idealAssignment() {
+        return idealAssignment;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> assignment() {
+        return assignment;
+    }
+
+    /** {@inheritDoc} */
+    @Override public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<ClusterNode> get(int part) {
+        assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
+            " [part=" + part + ", partitions=" + assignment.size() + ']';
+
+        return assignment.get(part);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HashSet<UUID> getIds(int part) {
+        assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
+            " [part=" + part + ", partitions=" + assignment.size() + ']';
+
+        List<ClusterNode> nodes = assignment.get(part);
+
+        HashSet<UUID> ids = U.newHashSet(nodes.size());
+
+        for (int i = 0; i < nodes.size(); i++)
+            ids.add(nodes.get(i).id());
+
+        return ids;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<ClusterNode> primaryPartitionNodes() {
+        Set<ClusterNode> res = new HashSet<>();
+
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
+
+            if (!F.isEmpty(nodes))
+                res.add(nodes.get(0));
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<Integer> primaryPartitions(UUID nodeId) {
+        Set<Integer> res = new HashSet<>();
+
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
+
+            if (!F.isEmpty(nodes) && nodes.get(0).id().equals(nodeId))
+                res.add(p);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Set<Integer> backupPartitions(UUID nodeId) {
+        Set<Integer> res = new HashSet<>();
+
+        for (int p = 0; p < assignment.size(); p++) {
+            List<ClusterNode> nodes = assignment.get(p);
+
+            for (int i = 1; i < nodes.size(); i++) {
+                ClusterNode node = nodes.get(i);
+
+                if (node.id().equals(nodeId)) {
+                    res.add(p);
+
+                    break;
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return topVer.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("SimplifiableIfStatement")
+    @Override public boolean equals(Object o) {
+        if (o == this)
+            return true;
+
+        if (o == null || !(o instanceof AffinityAssignment))
+            return false;
+
+        return topVer.equals(((AffinityAssignment)o).topologyVersion());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HistoryAffinityAssignment.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 1aedf4e..2890887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -127,7 +127,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param node Event node.
      * @param topVer Topology version.
      */
-    public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+    void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
         if (type == EVT_NODE_JOINED && node.isLocal()) {
             // Clean-up in case of client reconnect.
             registeredCaches.clear();
@@ -153,7 +153,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param msg Customer message.
      * @return {@code True} if minor topology version should be increased.
      */
-    public boolean onCustomEvent(CacheAffinityChangeMessage msg) {
+    boolean onCustomEvent(CacheAffinityChangeMessage msg) {
         assert lateAffAssign : msg;
 
         if (msg.exchangeId() != null) {
@@ -219,7 +219,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param top Topology.
      * @param checkCacheId Cache ID.
      */
-    public void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) {
+    void checkRebalanceState(GridDhtPartitionTopology top, Integer checkCacheId) {
         if (!lateAffAssign)
             return;
 
@@ -508,6 +508,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         assert assignment != null;
 
+        final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
         forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                 List<List<ClusterNode>> idealAssignment = aff.idealAssignment();
@@ -527,7 +529,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 else
                     newAssignment = idealAssignment;
 
-                aff.initialize(topVer, newAssignment);
+                aff.initialize(topVer, cachedAssignment(aff, newAssignment, affCache));
             }
         });
     }
@@ -562,6 +564,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
         final Map<Integer, IgniteUuid> deploymentIds = msg.cacheDeploymentIds();
 
+        final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
         forAllCaches(crd, new IgniteInClosureX<GridAffinityAssignmentCache>() {
             @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
                 AffinityTopologyVersion affTopVer = aff.lastVersion();
@@ -602,7 +606,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         assignment.set(part, nodes);
                     }
 
-                    aff.initialize(topVer, assignment);
+                    aff.initialize(topVer, cachedAssignment(aff, assignment, affCache));
                 }
                 else
                     aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer);
@@ -1206,6 +1210,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         throws IgniteCheckedException {
         AffinityTopologyVersion topVer = fut.topologyVersion();
 
+        final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+
         if (!crd) {
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                 if (cacheCtx.isLocal())
@@ -1213,7 +1219,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                 boolean latePrimary = cacheCtx.rebalanceEnabled();
 
-                initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary);
+                initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache);
             }
 
             return null;
@@ -1227,7 +1233,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                     boolean latePrimary = cache.rebalanceEnabled;
 
-                    initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary);
+                    initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary, affCache);
                 }
             });
 
@@ -1240,12 +1246,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @param aff Affinity.
      * @param rebalanceInfo Rebalance information.
      * @param latePrimary If {@code true} delays primary assignment if it is not owner.
+     * @param affCache Already calculated assignments (to reduce data stored in history).
      * @throws IgniteCheckedException If failed.
      */
     private void initAffinityOnNodeJoin(GridDhtPartitionsExchangeFuture fut,
         GridAffinityAssignmentCache aff,
         WaitRebalanceInfo rebalanceInfo,
-        boolean latePrimary)
+        boolean latePrimary,
+        Map<Object, List<List<ClusterNode>>> affCache)
         throws IgniteCheckedException
     {
         assert lateAffAssign;
@@ -1292,7 +1300,26 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         if (newAssignment == null)
             newAssignment = idealAssignment;
 
-        aff.initialize(fut.topologyVersion(), newAssignment);
+        aff.initialize(fut.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
+    }
+
+    /**
+     * @param aff Assignment cache.
+     * @param assign Assignment.
+     * @param affCache Assignments already calculated for other caches.
+     * @return Assignment.
+     */
+    private List<List<ClusterNode>> cachedAssignment(GridAffinityAssignmentCache aff,
+        List<List<ClusterNode>> assign,
+        Map<Object, List<List<ClusterNode>>> affCache) {
+        List<List<ClusterNode>> assign0 = affCache.get(aff.similarAffinityKey());
+
+        if (assign0 != null && assign0.equals(assign))
+            assign = assign0;
+        else
+            affCache.put(aff.similarAffinityKey(), assign);
+
+        return assign;
     }
 
     /**
@@ -1367,7 +1394,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @return Affinity assignment.
      * @throws IgniteCheckedException If failed.
      */
-    public Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
+    private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
         throws IgniteCheckedException {
         final AffinityTopologyVersion topVer = fut.topologyVersion();
 
@@ -1554,7 +1581,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
          * @param aff Affinity cache.
          * @param initAff Existing affinity cache.
          */
-        public CacheHolder(boolean rebalanceEnabled, GridAffinityAssignmentCache aff, @Nullable GridAffinityAssignmentCache initAff) {
+        CacheHolder(boolean rebalanceEnabled,
+            GridAffinityAssignmentCache aff,
+            @Nullable GridAffinityAssignmentCache initAff) {
             this.aff = aff;
 
             if (initAff != null)
@@ -1606,7 +1635,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     /**
      * Created cache is started on coordinator.
      */
-    class CacheHolder1 extends CacheHolder {
+    private class CacheHolder1 extends CacheHolder {
         /** */
         private final GridCacheContext cctx;
 
@@ -1614,7 +1643,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
          * @param cctx Cache context.
          * @param initAff Current affinity.
          */
-        public CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) {
+        CacheHolder1(GridCacheContext cctx, @Nullable GridAffinityAssignmentCache initAff) {
             super(cctx.rebalanceEnabled(), cctx.affinity().affinityCache(), initAff);
 
             assert !cctx.isLocal() : cctx.name();
@@ -1651,7 +1680,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     /**
      * Created if cache is not started on coordinator.
      */
-    static class CacheHolder2 extends CacheHolder {
+    private static class CacheHolder2 extends CacheHolder {
         /** */
         private final GridCacheSharedContext cctx;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index e10e5aa..4dcff9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -62,6 +62,13 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     }
 
     /**
+     * @param id Message ID.
+     */
+    public void id(IgniteUuid id) {
+        this.id = id;
+    }
+
+    /**
      * @return Collection of change requests.
      */
     public Collection<DynamicCacheChangeRequest> requests() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 71ae5c9..c6e7ee6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -25,8 +25,8 @@ import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -48,7 +48,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
     private static final AffinityTopologyVersion LOC_CACHE_TOP_VER = new AffinityTopologyVersion(1);
 
     /** */
-    public static final String FAILED_TO_FIND_CACHE_ERR_MSG = "Failed to find cache (cache was not started " +
+    private static final String FAILED_TO_FIND_CACHE_ERR_MSG = "Failed to find cache (cache was not started " +
         "yet or cache was already stopped): ";
 
     /** Affinity cached function. */
@@ -265,7 +265,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
      * @param topVer Topology version.
      * @return Affinity assignment.
      */
-    public GridAffinityAssignment assignment(AffinityTopologyVersion topVer) {
+    public AffinityAssignment assignment(AffinityTopologyVersion topVer) {
         if (cctx.isLocal())
             topVer = LOC_CACHE_TOP_VER;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 00d2d16..503b334 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -44,7 +44,9 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
@@ -56,6 +58,7 @@ import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -64,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@ -71,6 +75,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridListSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -80,6 +85,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -531,8 +537,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (top != null)
             return top;
 
+        Object affKey = null;
+
+        DynamicCacheDescriptor desc = cctx.cache().cacheDescriptor(cacheId);
+
+        if (desc != null) {
+            CacheConfiguration ccfg = desc.cacheConfiguration();
+
+            AffinityFunction aff = ccfg.getAffinity();
+
+            affKey = cctx.kernalContext().affinity().similaryAffinityKey(aff,
+                ccfg.getNodeFilter(),
+                ccfg.getBackups(),
+                aff.partitions());
+        }
+
         GridClientPartitionTopology old = clientTops.putIfAbsent(cacheId,
-            top = new GridClientPartitionTopology(cctx, cacheId, exchFut));
+            top = new GridClientPartitionTopology(cctx, cacheId, exchFut, affKey));
 
         return old != null ? old : top;
     }
@@ -761,40 +782,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param nodes Nodes.
      * @return {@code True} if message was sent, {@code false} if node left grid.
      */
-    private boolean sendAllPartitions(Collection<? extends ClusterNode> nodes) {
-        GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(null, null, AffinityTopologyVersion.NONE);
-
-        boolean useOldApi = false;
-
-        for (ClusterNode node : nodes) {
-            if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
-                useOldApi = true;
-        }
-
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal() && cacheCtx.started()) {
-                GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
-
-                if (useOldApi) {
-                    locMap = new GridDhtPartitionFullMap(locMap.nodeId(),
-                        locMap.nodeOrder(),
-                        locMap.updateSequence(),
-                        locMap);
-                }
-
-                m.addFullPartitionsMap(cacheCtx.cacheId(), locMap);
-            }
-        }
-
-        // It is important that client topologies be added after contexts.
-        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
-            m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true));
+    private boolean sendAllPartitions(Collection<ClusterNode> nodes) {
+        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(nodes, null, null, true);
 
         if (log.isDebugEnabled())
             log.debug("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']');
 
         for (ClusterNode node : nodes) {
             try {
+                assert !node.equals(cctx.localNode());
+
                 cctx.io().sendNoRetry(node, m, SYSTEM_POOL);
             }
             catch (ClusterTopologyCheckedException ignore) {
@@ -811,31 +808,140 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param node Node.
-     * @param id ID.
+     * @param nodes Target nodes.
+     * @param exchId Non-null exchange ID if message is created for exchange.
+     * @param lastVer Last version.
+     * @param compress {@code True} if it is possible to use compression for message.
+     * @return Message.
      */
-    private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
-        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
-            cctx.kernalContext().clientNode(),
-            cctx.versions().last());
+    public GridDhtPartitionsFullMessage createPartitionsFullMessage(Collection<ClusterNode> nodes,
+        @Nullable GridDhtPartitionExchangeId exchId,
+        @Nullable GridCacheVersion lastVer,
+        boolean compress) {
+        GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
+                lastVer,
+                exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
+
+        boolean useOldApi = false;
+
+        if (nodes != null) {
+            for (ClusterNode node : nodes) {
+                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0) {
+                    useOldApi = true;
+                    compress = false;
+
+                    break;
+                }
+                else if (node.version().compareToIgnoreTimestamp(GridDhtPartitionsAbstractMessage.PART_MAP_COMPRESS_SINCE) < 0)
+                    compress = false;
+            }
+        }
+
+        m.compress(compress);
+
+        Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>();
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal()) {
-                GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap();
+                boolean ready;
 
-                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
-                    locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map());
+                if (exchId != null) {
+                    AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
+
+                    ready = startTopVer == null || startTopVer.compareTo(exchId.topologyVersion()) <= 0;
+                }
+                else
+                    ready = cacheCtx.started();
+
+                if (ready) {
+                    GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
 
-                m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);
+                    if (useOldApi) {
+                        locMap = new GridDhtPartitionFullMap(locMap.nodeId(),
+                            locMap.nodeOrder(),
+                            locMap.updateSequence(),
+                            locMap);
+                    }
+
+                    addFullPartitionsMap(m,
+                        dupData,
+                        compress,
+                        cacheCtx.cacheId(),
+                        locMap,
+                        cacheCtx.affinity().affinityCache().similarAffinityKey());
+
+                    if (exchId != null)
+                        m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
+                }
             }
         }
 
-        for (GridClientPartitionTopology top : clientTops.values()) {
-            GridDhtPartitionMap2 locMap = top.localPartitionMap();
+        // It is important that client topologies be added after contexts.
+        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
+            GridDhtPartitionFullMap map = top.partitionMap(true);
+
+            addFullPartitionsMap(m,
+                dupData,
+                compress,
+                top.cacheId(),
+                map,
+                top.similarAffinityKey());
+
+            if (exchId != null)
+                m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters(true));
+        }
+
+        return m;
+    }
 
-            m.addLocalPartitionMap(top.cacheId(), locMap);
+    /**
+     * @param m Message.
+     * @param dupData Duplicated data map.
+     * @param compress {@code True} if need check for duplicated partition state data.
+     * @param cacheId Cache ID.
+     * @param map Map to add.
+     * @param affKey Cache affinity key.
+     */
+    private void addFullPartitionsMap(GridDhtPartitionsFullMessage m,
+        Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData,
+        boolean compress,
+        Integer cacheId,
+        GridDhtPartitionFullMap map,
+        Object affKey) {
+        Integer dupDataCache = null;
+
+        if (compress && affKey != null && !m.containsCache(cacheId)) {
+            T2<Integer, GridDhtPartitionFullMap> state0 = dupData.get(affKey);
+
+            if (state0 != null && state0.get2().partitionStateEquals(map)) {
+                GridDhtPartitionFullMap map0 = new GridDhtPartitionFullMap(map.nodeId(),
+                    map.nodeOrder(),
+                    map.updateSequence());
+
+                for (Map.Entry<UUID, GridDhtPartitionMap2> e : map.entrySet())
+                    map0.put(e.getKey(), e.getValue().emptyCopy());
+
+                map = map0;
+
+                dupDataCache = state0.get1();
+            }
+            else
+                dupData.put(affKey, new T2<>(cacheId, map));
         }
 
+        m.addFullPartitionsMap(cacheId, map, dupDataCache);
+    }
+
+    /**
+     * @param node Node.
+     * @param id ID.
+     */
+    private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
+        GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node,
+            id,
+            cctx.kernalContext().clientNode(),
+            false);
+
         if (log.isDebugEnabled())
             log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']');
 
@@ -853,6 +959,98 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * @param targetNode Target node.
+     * @param exchangeId ID.
+     * @param clientOnlyExchange Client exchange flag.
+     * @param sndCounters {@code True} if need send partition update counters.
+     * @return Message.
+     */
+    public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(ClusterNode targetNode,
+        @Nullable GridDhtPartitionExchangeId exchangeId,
+        boolean clientOnlyExchange,
+        boolean sndCounters)
+    {
+        boolean compress =
+            targetNode.version().compareToIgnoreTimestamp(GridDhtPartitionsSingleMessage.PART_MAP_COMPRESS_SINCE) >= 0;
+
+        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId,
+            clientOnlyExchange,
+            cctx.versions().last(),
+            compress);
+
+        Map<Object, T2<Integer,Map<Integer, GridDhtPartitionState>>> dupData = new HashMap<>();
+
+        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+            if (!cacheCtx.isLocal()) {
+                GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap();
+
+                if (targetNode.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
+                    locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map());
+
+                addPartitionMap(m,
+                    dupData,
+                    compress,
+                    cacheCtx.cacheId(),
+                    locMap,
+                    cacheCtx.affinity().affinityCache().similarAffinityKey());
+
+                if (sndCounters)
+                    m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters(true));
+            }
+        }
+
+        for (GridClientPartitionTopology top : clientTops.values()) {
+            if (m.partitions() != null && m.partitions().containsKey(top.cacheId()))
+                continue;
+
+            GridDhtPartitionMap2 locMap = top.localPartitionMap();
+
+            addPartitionMap(m,
+                dupData,
+                compress,
+                top.cacheId(),
+                locMap,
+                top.similarAffinityKey());
+
+            if (sndCounters)
+                m.partitionUpdateCounters(top.cacheId(), top.updateCounters(true));
+        }
+
+        return m;
+    }
+
+    /**
+     * @param m Message.
+     * @param dupData Duplicated data map.
+     * @param compress {@code True} if need check for duplicated partition state data.
+     * @param cacheId Cache ID.
+     * @param map Map to add.
+     * @param affKey Cache affinity key.
+     */
+    private void addPartitionMap(GridDhtPartitionsSingleMessage m,
+        Map<Object, T2<Integer, Map<Integer, GridDhtPartitionState>>> dupData,
+        boolean compress,
+        Integer cacheId,
+        GridDhtPartitionMap2 map,
+        Object affKey) {
+        Integer dupDataCache = null;
+
+        if (compress) {
+            T2<Integer, Map<Integer, GridDhtPartitionState>> state0 = dupData.get(affKey);
+
+            if (state0 != null && state0.get2().equals(map.map())) {
+                dupDataCache = state0.get1();
+
+                map = map.emptyCopy();
+            }
+            else
+                dupData.put(affKey, new T2<>(cacheId, map.map()));
+        }
+
+        m.addLocalPartitionMap(cacheId, map, dupDataCache);
+    }
+
+    /**
      * @param nodeId Cause node ID.
      * @param topVer Topology version.
      * @param evt Event type.
@@ -869,7 +1067,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param affChangeMsg Affinity change message.
      * @return Exchange future.
      */
-    GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
+    private GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
         @Nullable DiscoveryEvent discoEvt,
         @Nullable Collection<DynamicCacheChangeRequest> reqs,
         @Nullable CacheAffinityChangeMessage affChangeMsg) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index fd6abbd..5e777fd 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1958,8 +1958,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 req.template(true);
 
-                req.deploymentId(desc.deploymentId());
-
                 reqs.add(req);
             }
 
@@ -1972,6 +1970,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         batch.clientReconnect(reconnect);
 
+        // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same.
+        batch.id(null);
+
         return batch;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 58933b7..5efb317 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -61,6 +61,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     /** Flag to control amount of output for full map. */
     private static final boolean FULL_MAP_DEBUG = false;
 
+    /** */
+    private static final Long ZERO = 0L;
+
     /** Cache shared context. */
     private GridCacheSharedContext cctx;
 
@@ -97,18 +100,24 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     /** Partition update counters. */
     private Map<Integer, Long> cntrMap = new HashMap<>();
 
+    /** */
+    private final Object similarAffKey;
+
     /**
      * @param cctx Context.
      * @param cacheId Cache ID.
      * @param exchFut Exchange ID.
+     * @param similarAffKey Key to find caches with similar affinity.
      */
     public GridClientPartitionTopology(
         GridCacheSharedContext cctx,
         int cacheId,
-        GridDhtPartitionsExchangeFuture exchFut
+        GridDhtPartitionsExchangeFuture exchFut,
+        Object similarAffKey
     ) {
         this.cctx = cctx;
         this.cacheId = cacheId;
+        this.similarAffKey = similarAffKey;
 
         topVer = exchFut.topologyVersion();
 
@@ -125,6 +134,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /**
+     * @return Key to find caches with similar affinity.
+     */
+    @Nullable public Object similarAffinityKey() {
+        return similarAffKey;
+    }
+
+    /**
      * @return Full map string representation.
      */
     @SuppressWarnings( {"ConstantConditions"})
@@ -873,11 +889,22 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, Long> updateCounters() {
+    @Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
         lock.readLock().lock();
 
         try {
-            return new HashMap<>(cntrMap);
+            if (skipZeros) {
+                Map<Integer, Long> res = U.newHashMap(cntrMap.size());
+
+                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+                    if (!e.getValue().equals(ZERO))
+                        res.put(e.getKey(), e.getValue());
+                }
+
+                return res;
+            }
+            else
+                return new HashMap<>(cntrMap);
         }
         finally {
             lock.readLock().unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 6e9b907..4ae4e47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -220,9 +220,10 @@ public interface GridDhtPartitionTopology {
         @Nullable Map<Integer, Long> cntrMap);
 
     /**
+     * @param skipZeros If {@code true} then filters out zero counters.
      * @return Partition update counters.
      */
-    public Map<Integer, Long> updateCounters();
+    public Map<Integer, Long> updateCounters(boolean skipZeros);
 
     /**
      * @param part Partition to own.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 50f7f0f..f3751ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -35,8 +35,8 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -71,6 +71,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** Flag to control amount of output for full map. */
     private static final boolean FULL_MAP_DEBUG = false;
 
+    /** */
+    private static final Long ZERO = 0L;
+
     /** Context. */
     private final GridCacheContext<?, ?> cctx;
 
@@ -859,7 +862,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
-        GridAffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
+        AffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
 
         List<ClusterNode> affNodes = affAssignment.get(p);
 
@@ -1500,11 +1503,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, Long> updateCounters() {
+    @Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
         lock.readLock().lock();
 
         try {
-            Map<Integer, Long> res = new HashMap<>(cntrMap);
+            Map<Integer, Long> res;
+
+            if (skipZeros) {
+                res = U.newHashMap(cntrMap.size());
+
+                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+                    Long cntr = e.getValue();
+
+                    if (ZERO.equals(cntr))
+                        continue;
+
+                    res.put(e.getKey(), cntr);
+                }
+            }
+            else
+                res = new HashMap<>(cntrMap);
 
             for (int i = 0; i < locParts.length; i++) {
                 GridDhtLocalPartition part = locParts[i];
@@ -1513,7 +1531,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     continue;
 
                 Long cntr0 = res.get(part.id());
-                Long cntr1 = part.updateCounter();
+                long cntr1 = part.updateCounter();
+
+                if (skipZeros && cntr1 == 0L)
+                    continue;
 
                 if (cntr0 == null || cntr1 > cntr0)
                     res.put(part.id(), cntr1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
index 498d492..8f5ad17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
@@ -158,6 +158,24 @@ public class GridDhtPartitionFullMap extends HashMap<UUID, GridDhtPartitionMap2>
     }
 
     /**
+     * @param fullMap Map.
+     * @return {@code True} if this map and given map contain the same data.
+     */
+    public boolean partitionStateEquals(GridDhtPartitionFullMap fullMap) {
+        if (size() != fullMap.size())
+            return false;
+
+        for (Map.Entry<UUID, GridDhtPartitionMap2> e : entrySet()) {
+            GridDhtPartitionMap2 m = fullMap.get(e.getKey());
+
+            if (m == null || !m.map().equals(e.getValue().map()))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
      * @param updateSeq New update sequence value.
      * @return Old update sequence value.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
index 15b5a2e..ce36a11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
@@ -61,27 +61,24 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
     private volatile int moving;
 
     /**
-     * @param nodeId Node ID.
-     * @param updateSeq Update sequence number.
+     * Empty constructor required for {@link Externalizable}.
      */
-    public GridDhtPartitionMap2(UUID nodeId, long updateSeq) {
-        assert nodeId != null;
-        assert updateSeq > 0;
-
-        this.nodeId = nodeId;
-        this.updateSeq = updateSeq;
-
-        map = new HashMap<>();
+    public GridDhtPartitionMap2() {
+        // No-op.
     }
 
     /**
      * @param nodeId Node ID.
      * @param updateSeq Update sequence number.
+     * @param top Topology version.
      * @param m Map to copy.
      * @param onlyActive If {@code true}, then only active states will be included.
      */
-    public GridDhtPartitionMap2(UUID nodeId, long updateSeq, AffinityTopologyVersion top,
-        Map<Integer, GridDhtPartitionState> m, boolean onlyActive) {
+    public GridDhtPartitionMap2(UUID nodeId,
+        long updateSeq,
+        AffinityTopologyVersion top,
+        Map<Integer, GridDhtPartitionState> m,
+        boolean onlyActive) {
         assert nodeId != null;
         assert updateSeq > 0;
 
@@ -100,10 +97,33 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
     }
 
     /**
-     * Empty constructor required for {@link Externalizable}.
+     * @param nodeId Node ID.
+     * @param updateSeq Update sequence number.
+     * @param top Topology version.
+     * @param map Map.
+     * @param moving Number of moving partitions.
      */
-    public GridDhtPartitionMap2() {
-        // No-op.
+    private GridDhtPartitionMap2(UUID nodeId,
+        long updateSeq,
+        AffinityTopologyVersion top,
+        Map<Integer, GridDhtPartitionState> map,
+        int moving) {
+        this.nodeId = nodeId;
+        this.updateSeq = updateSeq;
+        this.top = top;
+        this.map = map;
+        this.moving = moving;
+    }
+
+    /**
+     * @return Copy with empty partition state map.
+     */
+    public GridDhtPartitionMap2 emptyCopy() {
+        return new GridDhtPartitionMap2(nodeId,
+            updateSeq,
+            top,
+            U.<Integer, GridDhtPartitionState>newHashMap(0),
+            0);
     }
 
     /**
@@ -277,9 +297,8 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
         long ver = in.readLong();
         int minorVer = in.readInt();
 
-        if (ver != 0) {
+        if (ver != 0)
             top = new AffinityTopologyVersion(ver, minorVer);
-        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 4e714ed..6e69161 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
@@ -29,7 +30,13 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Request for single partition info.
  */
-abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
+public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
+    /** */
+    public static final IgniteProductVersion PART_MAP_COMPRESS_SINCE = IgniteProductVersion.fromString("1.6.11");
+
+    /** */
+    protected static final byte COMPRESSED_FLAG_MASK = 1;
+
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -39,6 +46,9 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
     /** Last used cache version. */
     private GridCacheVersion lastVer;
 
+    /** */
+    private byte flags;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -79,6 +89,20 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
         return lastVer;
     }
 
+    /**
+     * @return {@code True} if message data is compressed.
+     */
+    protected final boolean compressed() {
+        return (flags & COMPRESSED_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @param compressed {@code True} if message data is compressed.
+     */
+    protected final void compressed(boolean compressed) {
+        flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : (byte)(flags & ~COMPRESSED_FLAG_MASK);
+    }
+
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
@@ -101,6 +125,12 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
                 writer.incrementState();
 
             case 4:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
                 if (!writer.writeMessage("lastVer", lastVer))
                     return false;
 
@@ -131,6 +161,14 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage {
                 reader.incrementState();
 
             case 4:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
                 lastVer = reader.readMessage("lastVer");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/7128a395/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 80b3768..f391265 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -64,6 +65,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -544,7 +546,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     exchId.topologyVersion().equals(cacheCtx.startTopologyVersion());
 
                 if (updateTop && clientTop != null)
-                    cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters());
+                    cacheCtx.topology().update(exchId, clientTop.partitionMap(true), clientTop.updateCounters(false));
             }
 
             top.updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId()));
@@ -668,7 +670,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                             if (top.cacheId() == cacheCtx.cacheId()) {
                                 cacheCtx.topology().update(exchId,
                                     top.partitionMap(true),
-                                    top.updateCounters());
+                                    top.updateCounters(false));
 
                                 break;
                             }
@@ -678,7 +680,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             }
             else {
                 if (!centralizedAff)
-                    sendLocalPartitions(crd, exchId);
+                    sendLocalPartitions(crd);
 
                 initDone();
 
@@ -928,27 +930,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
     /**
      * @param node Node.
-     * @param id ID.
      * @throws IgniteCheckedException If failed.
      */
-    private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id)
+    private void sendLocalPartitions(ClusterNode node)
         throws IgniteCheckedException {
-        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
+        GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage(node,
+            exchangeId(),
             clientOnlyExchange,
-            cctx.versions().last());
-
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal()) {
-                GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap();
-
-                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
-                    locMap = new GridDhtPartitionMap(locMap.nodeId(), locMap.updateSequence(), locMap.map());
-
-                m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);
-
-                m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
-            }
-        }
+            true);
 
         if (log.isDebugEnabled())
             log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']');
@@ -964,51 +953,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
     /**
      * @param nodes Target nodes.
-     * @return Message;
+     * @param compress {@code True} if it is possible to use compression for message.
+     * @return Message.
      */
-    private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes) {
+    private GridDhtPartitionsFullMessage createPartitionsMessage(Collection<ClusterNode> nodes, boolean compress) {
         GridCacheVersion last = lastVer.get();
 
-        GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchangeId(),
+        return cctx.exchange().createPartitionsFullMessage(nodes,
+            exchangeId(),
             last != null ? last : cctx.versions().last(),
-            topologyVersion());
-
-        boolean useOldApi = false;
-
-        if (nodes != null) {
-            for (ClusterNode node : nodes) {
-                if (node.version().compareTo(GridDhtPartitionMap2.SINCE) < 0)
-                    useOldApi = true;
-            }
-        }
-
-        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal()) {
-                AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
-
-                boolean ready = startTopVer == null || startTopVer.compareTo(topologyVersion()) <= 0;
-
-                if (ready) {
-                    GridDhtPartitionFullMap locMap = cacheCtx.topology().partitionMap(true);
-
-                    if (useOldApi)
-                        locMap = new GridDhtPartitionFullMap(locMap.nodeId(), locMap.nodeOrder(), locMap.updateSequence(), locMap);
-
-                    m.addFullPartitionsMap(cacheCtx.cacheId(), locMap);
-
-                    m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
-                }
-            }
-        }
-
-        // It is important that client topologies be added after contexts.
-        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
-            m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true));
-
-            m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters());
-        }
-
-        return m;
+            compress);
     }
 
     /**
@@ -1016,7 +970,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException {
-        GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes);
+        GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, true);
+
+        assert !nodes.contains(cctx.localNode());
 
         if (log.isDebugEnabled())
             log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
@@ -1030,7 +986,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      */
     private void sendPartitions(ClusterNode oldestNode) {
         try {
-            sendLocalPartitions(oldestNode, exchId);
+            sendLocalPartitions(oldestNode);
         }
         catch (ClusterTopologyCheckedException ignore) {
             if (log.isDebugEnabled())
@@ -1234,7 +1190,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
 
-            GridDhtPartitionsFullMessage m = createPartitionsMessage(null);
+            GridDhtPartitionsFullMessage m = createPartitionsMessage(null, false);
 
             CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
 


Mime
View raw message