ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [02/49] ignite git commit: Multiple optimizations from ignite-gg-8.0.3.ea5-atomicbench.
Date Fri, 02 Jun 2017 17:13:15 GMT
Multiple optimizations from ignite-gg-8.0.3.ea5-atomicbench.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/52556f4b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/52556f4b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/52556f4b

Branch: refs/heads/ignite-5398
Commit: 52556f4bf6d544a44bfd49b02d84aa32f741813f
Parents: a3ad6e0
Author: Ilya Lantukh <ilantukh@gridgain.com>
Authored: Fri Apr 28 19:40:31 2017 +0300
Committer: Ilya Lantukh <ilantukh@gridgain.com>
Committed: Fri Apr 28 19:40:31 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   3 +
 .../rendezvous/RendezvousAffinityFunction.java  | 263 +++++++++++--------
 .../discovery/GridDiscoveryManager.java         |   1 +
 .../cache/CacheAffinitySharedManager.java       |  68 +++++
 .../cache/DynamicCacheChangeBatch.java          |  17 ++
 .../GridCachePartitionExchangeManager.java      |  22 +-
 .../processors/cache/GridCacheProcessor.java    |  25 +-
 .../dht/GridClientPartitionTopology.java        |  88 ++++---
 .../dht/GridDhtPartitionTopology.java           |   9 +
 .../dht/GridDhtPartitionTopologyImpl.java       | 104 +++++---
 .../dht/preloader/GridDhtPartitionMap2.java     |  45 +---
 .../GridDhtPartitionsExchangeFuture.java        |  61 ++++-
 .../internal/util/GridPartitionStateMap.java    | 174 ++++++++++++
 .../communication/tcp/TcpCommunicationSpi.java  |  17 +-
 .../resources/META-INF/classnames.properties    |   1 +
 .../GridCachePartitionedAffinitySpreadTest.java |   8 +-
 16 files changed, 669 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 0b2fe65..ce2666b 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -552,6 +552,9 @@ public final class IgniteSystemProperties {
      */
     public static final String IGNITE_MAX_INDEX_PAYLOAD_SIZE = "IGNITE_MAX_INDEX_PAYLOAD_SIZE";
 
+    /** */
+    public static final String IGNITE_START_CACHES_ON_JOIN = "IGNITE_START_CACHES_ON_JOIN";
+
     /**
      * Enforces singleton.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index 25fa3a1..021f4e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -22,18 +22,16 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.UUID;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityFunctionContext;
@@ -48,7 +46,6 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
 import org.jetbrains.annotations.Nullable;
 
@@ -84,20 +81,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
     /** Comparator. */
     private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator();
 
-    /** Thread local message digest. */
-    private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() {
-        @Override protected MessageDigest initialValue() {
-            try {
-                return MessageDigest.getInstance("MD5");
-            }
-            catch (NoSuchAlgorithmException e) {
-                assert false : "Should have failed in constructor";
-
-                throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", e);
-            }
-        }
-    };
-
     /** Number of partitions. */
     private int parts;
 
@@ -118,10 +101,6 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
     /** Hash ID resolver. */
     private AffinityNodeHashResolver hashIdRslvr = null;
 
-    /** Ignite instance. */
-    @IgniteInstanceResource
-    private Ignite ignite;
-
     /** Logger instance. */
     @LoggerResource
     private transient IgniteLogger log;
@@ -186,18 +165,10 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
     private RendezvousAffinityFunction(boolean exclNeighbors, int parts,
         IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) {
         A.ensure(parts > 0, "parts > 0");
-        A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <=" + CacheConfiguration.MAX_PARTITIONS_COUNT);
 
         this.exclNeighbors = exclNeighbors;
         this.parts = parts;
         this.backupFilter = backupFilter;
-
-        try {
-            MessageDigest.getInstance("MD5");
-        }
-        catch (NoSuchAlgorithmException e) {
-            throw new IgniteException("Failed to obtain MD5 message digest instance.", e);
-        }
     }
 
     /**
@@ -222,7 +193,8 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
      * @param parts Total number of partitions.
      */
     public void setPartitions(int parts) {
-        A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT, "parts <= " + CacheConfiguration.MAX_PARTITIONS_COUNT);
+        A.ensure(parts <= CacheConfiguration.MAX_PARTITIONS_COUNT,
+            "parts <= " + CacheConfiguration.MAX_PARTITIONS_COUNT);
 
         this.parts = parts;
     }
@@ -355,116 +327,90 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
     /**
      * Returns collection of nodes (primary first) for specified partition.
      *
-     * @param d Message digest.
      * @param part Partition.
      * @param nodes Nodes.
-     * @param nodesHash Serialized nodes hashes.
      * @param backups Number of backups.
      * @param neighborhoodCache Neighborhood.
      * @return Assignment.
      */
-    public List<ClusterNode> assignPartition(MessageDigest d,
-        int part,
+    public List<ClusterNode> assignPartition(int part,
         List<ClusterNode> nodes,
-        Map<ClusterNode, byte[]> nodesHash,
         int backups,
         @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
         if (nodes.size() <= 1)
             return nodes;
 
-        if (d == null)
-            d = digest.get();
-
-        List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size());
-
-        try {
-            for (int i = 0; i < nodes.size(); i++) {
-                ClusterNode node = nodes.get(i);
-
-                byte[] nodeHashBytes = nodesHash.get(node);
-
-                if (nodeHashBytes == null) {
-                    Object nodeHash = resolveNodeHash(node);
-
-                    byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
-
-                    // Add 4 bytes for partition bytes.
-                    nodeHashBytes = new byte[nodeHashBytes0.length + 4];
-
-                    System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length);
-
-                    nodesHash.put(node, nodeHashBytes);
-                }
+        IgniteBiTuple<Long, ClusterNode> [] hashArr =
+            (IgniteBiTuple<Long, ClusterNode> [])new IgniteBiTuple[nodes.size()];
 
-                U.intToBytes(part, nodeHashBytes, 0);
+        for (int i = 0; i < nodes.size(); i++) {
+            ClusterNode node = nodes.get(i);
 
-                d.reset();
+            Object nodeHash = resolveNodeHash(node);
 
-                byte[] bytes = d.digest(nodeHashBytes);
+            long hash = hash(nodeHash.hashCode(), part);
 
-                long hash =
-                    (bytes[0] & 0xFFL)
-                        | ((bytes[1] & 0xFFL) << 8)
-                        | ((bytes[2] & 0xFFL) << 16)
-                        | ((bytes[3] & 0xFFL) << 24)
-                        | ((bytes[4] & 0xFFL) << 32)
-                        | ((bytes[5] & 0xFFL) << 40)
-                        | ((bytes[6] & 0xFFL) << 48)
-                        | ((bytes[7] & 0xFFL) << 56);
-
-                lst.add(F.t(hash, node));
-            }
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
+            hashArr[i] = F.t(hash, node);
         }
 
-        Collections.sort(lst, COMPARATOR);
+        final int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size());
 
-        int primaryAndBackups = backups == Integer.MAX_VALUE ? nodes.size() : Math.min(backups + 1, nodes.size());
+        Iterable<ClusterNode> sortedNodes = new LazyLinearSortedContainer(hashArr, primaryAndBackups);
+
+        Iterator<ClusterNode> it = sortedNodes.iterator();
 
         List<ClusterNode> res = new ArrayList<>(primaryAndBackups);
 
-        ClusterNode primary = lst.get(0).get2();
+        Collection<ClusterNode> allNeighbors = new HashSet<>();
+
+        ClusterNode primary = it.next();
 
         res.add(primary);
 
+        if (exclNeighbors)
+            allNeighbors.addAll(neighborhoodCache.get(primary.id()));
+
         // Select backups.
         if (backups > 0) {
-            for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
-                IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
-
-                ClusterNode node = next.get2();
+            while (it.hasNext() && res.size() < primaryAndBackups) {
+                ClusterNode node = it.next();
 
                 if (exclNeighbors) {
-                    Collection<ClusterNode> allNeighbors = GridCacheUtils.neighborsForNodes(neighborhoodCache, res);
-
-                    if (!allNeighbors.contains(node))
+                    if (!allNeighbors.contains(node)) {
                         res.add(node);
+
+                        allNeighbors.addAll(neighborhoodCache.get(node.id()));
+                    }
+                }
+                else if ((backupFilter != null && backupFilter.apply(primary, node))
+                    || (affinityBackupFilter != null && affinityBackupFilter.apply(node, res))
+                    || (affinityBackupFilter == null && backupFilter == null) ) {
+                    res.add(node);
+
+                    if (exclNeighbors)
+                        allNeighbors.addAll(neighborhoodCache.get(node.id()));
                 }
-                else if (affinityBackupFilter != null && affinityBackupFilter.apply(node, res))
-                    res.add(next.get2());
-                else if (backupFilter != null && backupFilter.apply(primary, node))
-                    res.add(next.get2());
-                else if (affinityBackupFilter == null && backupFilter == null)
-                    res.add(next.get2());
             }
         }
 
         if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) {
             // Need to iterate again in case if there are no nodes which pass exclude neighbors backups criteria.
-            for (int i = 1; i < lst.size() && res.size() < primaryAndBackups; i++) {
-                IgniteBiTuple<Long, ClusterNode> next = lst.get(i);
+            it = sortedNodes.iterator();
 
-                ClusterNode node = next.get2();
+            it.next();
+
+            while (it.hasNext() && res.size() < primaryAndBackups) {
+                ClusterNode node = it.next();
 
                 if (!res.contains(node))
-                    res.add(next.get2());
+                    res.add(node);
             }
 
             if (!exclNeighborsWarn) {
                 LT.warn(log, "Affinity function excludeNeighbors property is ignored " +
-                    "because topology has no enough nodes to assign backups.");
+                        "because topology has no enough nodes to assign backups.",
+                    "Affinity function excludeNeighbors property is ignored " +
+                        "because topology has no enough nodes to assign backups.");
 
                 exclNeighborsWarn = true;
             }
@@ -475,6 +421,31 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
         return res;
     }
 
+    /**
+     * The pack partition number and nodeHash.hashCode to long and mix it by hash function based on the Wang/Jenkins
+     * hash.
+     *
+     * @param key0 Hash key.
+     * @param key1 Hash key.
+     * @see <a href="https://gist.github.com/badboy/6267743#64-bit-mix-functions">64 bit mix functions</a>
+     * @return Long hash key.
+     */
+    private static long hash(int key0, int key1) {
+        long key = (key0 & 0xFFFFFFFFL)
+            | ((key1 & 0xFFFFFFFFL) << 32);
+
+        key = (~key) + (key << 21); // key = (key << 21) - key - 1;
+        key ^= (key >>> 24);
+        key += (key << 3) + (key << 8); // key * 265
+        key ^= (key >>> 14);
+        key += (key << 2) + (key << 4); // key * 21
+        key ^= (key >>> 28);
+        key += (key << 31);
+
+        return key;
+    }
+
+
     /** {@inheritDoc} */
     @Override public void reset() {
         // No-op.
@@ -501,19 +472,10 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
         Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
             GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
 
-        MessageDigest d = digest.get();
-
         List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
 
-        Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size());
-
         for (int i = 0; i < parts; i++) {
-            List<ClusterNode> partAssignment = assignPartition(d,
-                i,
-                nodes,
-                nodesHash,
-                affCtx.backups(),
-                neighborhoodCache);
+            List<ClusterNode> partAssignment = assignPartition(i, nodes, affCtx.backups(), neighborhoodCache);
 
             assignments.add(partAssignment);
         }
@@ -556,4 +518,83 @@ public class RendezvousAffinityFunction implements AffinityFunction, Externaliza
                 o1.get2().id().compareTo(o2.get2().id());
         }
     }
+
+    /**
+     * Sorts the initial array with linear sort algorithm array
+     */
+    private static class LazyLinearSortedContainer implements Iterable<ClusterNode> {
+        /** Initial node-hash array. */
+        private final IgniteBiTuple<Long, ClusterNode>[] arr;
+
+        /** Count of the sorted elements */
+        private int sorted;
+
+        /**
+         * @param arr Node / partition hash list.
+         * @param needFirstSortedCnt Estimate count of elements to return by iterator.
+         */
+        LazyLinearSortedContainer(IgniteBiTuple<Long, ClusterNode>[] arr, int needFirstSortedCnt) {
+            this.arr = arr;
+
+            if (needFirstSortedCnt > (int)Math.log(arr.length)) {
+                Arrays.sort(arr, COMPARATOR);
+
+                sorted = arr.length;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<ClusterNode> iterator() {
+            return new SortIterator();
+        }
+
+        /**
+         *
+         */
+        private class SortIterator implements Iterator<ClusterNode> {
+            /** Index of the first unsorted element. */
+            private int cur;
+
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                return cur < arr.length;
+            }
+
+            /** {@inheritDoc} */
+            @Override public ClusterNode next() {
+                if (!hasNext())
+                    throw new NoSuchElementException();
+
+                if (cur < sorted)
+                    return arr[cur++].get2();
+
+                IgniteBiTuple<Long, ClusterNode> min = arr[cur];
+
+                int minIdx = cur;
+
+                for (int i = cur + 1; i < arr.length; i++) {
+                    if (COMPARATOR.compare(arr[i], min) < 0) {
+                        minIdx = i;
+
+                        min = arr[i];
+                    }
+                }
+
+                if (minIdx != cur) {
+                    arr[minIdx] = arr[cur];
+
+                    arr[cur] = min;
+                }
+
+                sorted = cur++;
+
+                return min.get2();
+            }
+
+            /** {@inheritDoc} */
+            @Override public void remove() {
+                throw new UnsupportedOperationException("Remove doesn't supported");
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 960a064..8703e29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -88,6 +88,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;

http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index c1dde13..814d7e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -91,6 +92,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private WaitRebalanceInfo waitInfo;
 
     /** */
+    private IgniteLogger exchLog;
+
+    /** */
     private final Object mux = new Object();
 
     /** Pending affinity assignment futures. */
@@ -123,6 +127,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         lateAffAssign = cctx.kernalContext().config().isLateAffinityAssignment() || cctx.database().persistenceEnabled();
 
         cctx.kernalContext().event().addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
+
+        exchLog = cctx.logger(GridDhtPartitionsExchangeFuture.EXCHANGE_LOG);
     }
 
     /**
@@ -342,6 +348,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         throws IgniteCheckedException {
         assert !F.isEmpty(reqs) : fut;
 
+        exchLog.info("onCacheChangeRequest start [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']');
+
         for (DynamicCacheChangeRequest req : reqs) {
             Integer cacheId = CU.cacheId(req.cacheName());
 
@@ -384,8 +392,12 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             Integer cacheId = CU.cacheId(req.cacheName());
 
             if (req.start()) {
+                exchLog.info("prepareCacheStart start [topVer=" + fut.topologyVersion() + ", cache=" + req.cacheName() + ']');
+
                 cctx.cache().prepareCacheStart(req, fut.topologyVersion());
 
+                exchLog.info("prepareCacheStart end [topVer=" + fut.topologyVersion() + ", cache=" + req.cacheName() + ']');
+
                 if (fut.isCacheAdded(cacheId, fut.topologyVersion())) {
                     if (fut.discoCache().cacheAffinityNodes(req.cacheName()).isEmpty())
                         U.quietAndWarn(log, "No server nodes found for cache client: " + req.cacheName());
@@ -401,6 +413,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                         if (clientCacheStarted)
                             initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
                         else if (!req.clientStartOnly()) {
+                            exchLog.info("calculateAff start [topVer=" + fut.topologyVersion() + ", cache=" + req.cacheName() + ']');
+
                             assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion());
 
                             GridAffinityAssignmentCache aff = cacheCtx.affinity().affinityCache();
@@ -411,6 +425,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                                 fut.discoveryEvent(), fut.discoCache());
 
                             aff.initialize(fut.topologyVersion(), assignment);
+
+                            exchLog.info("calculateAff end [topVer=" + fut.topologyVersion() + ", cache=" + req.cacheName() + ']');
                         }
                     }
                 }
@@ -487,6 +503,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
         }
 
+        exchLog.info("onCacheChangeRequest end [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']');
+
         return clientOnly;
     }
 
@@ -564,6 +582,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         assert msg.topologyVersion() != null && msg.exchangeId() == null : msg;
         assert affCalcVer == null || affCalcVer.equals(msg.topologyVersion());
 
+        exchLog.info("onChangeAffinityMessage start [topVer=" + exchFut.topologyVersion() + ", crd=" + crd + ']');
+
         final AffinityTopologyVersion topVer = exchFut.topologyVersion();
 
         if (log.isDebugEnabled()) {
@@ -631,6 +651,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             if (affCalcVer == null)
                 affCalcVer = msg.topologyVersion();
         }
+
+        exchLog.info("onChangeAffinityMessage end [topVer=" + exchFut.topologyVersion() + ", crd=" + crd + ']');
     }
 
     /**
@@ -832,11 +854,17 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
     private void initAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut, boolean fetch)
         throws IgniteCheckedException {
         if (!fetch && canCalculateAffinity(aff, fut)) {
+            exchLog.info("initAffinity start [topVer=" + fut.topologyVersion() + ", cache=" + aff.cacheName() + ']');
+
             List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
 
             aff.initialize(fut.topologyVersion(), assignment);
+
+            exchLog.info("initAffinity end [topVer=" + fut.topologyVersion() + ", cache=" + aff.cacheName() + ']');
         }
         else {
+            exchLog.info("fetchAffinity start [topVer=" + fut.topologyVersion() + ", cache=" + aff.cacheName() + ']');
+
             GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx,
                 aff.cacheName(),
                 fut.topologyVersion(),
@@ -845,6 +873,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             fetchFut.init();
 
             fetchAffinity(fut, aff, fetchFut);
+
+            exchLog.info("fetchAffinity end [topVer=" + fut.topologyVersion() + ", cache=" + aff.cacheName() + ']');
         }
     }
 
@@ -879,6 +909,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      */
     public void onServerJoin(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException {
+        exchLog.info("onServerJoin start [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']');
+
         assert !fut.discoveryEvent().eventNode().isClient();
 
         boolean locJoin = fut.discoveryEvent().eventNode().isLocal();
@@ -894,9 +926,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
                             CacheHolder cache = cache(fut, cacheDesc);
 
+                            exchLog.info("onServerJoin calc aff start [topVer=" + fut.topologyVersion() + ", cache=" + cache.name() + ']');
+
                             List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent(), fut.discoCache());
 
                             cache.affinity().initialize(topVer, newAff);
+
+                            exchLog.info("onServerJoin calc aff end [topVer=" + fut.topologyVersion() + ", cache=" + cache.name() + ']');
                         }
                     });
                 }
@@ -923,6 +959,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 }
             }
         }
+
+        exchLog.info("onServerJoin end [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']');
     }
 
     /**
@@ -949,6 +987,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      */
     private void fetchAffinityOnJoin(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
+        exchLog.info("fetchAffinityOnJoin start [topVer=" + fut.topologyVersion() + ']');
+
         AffinityTopologyVersion topVer = fut.topologyVersion();
 
         List<GridDhtAssignmentFetchFuture> fetchFuts = new ArrayList<>();
@@ -984,6 +1024,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
 
             fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut);
         }
+
+        exchLog.info("fetchAffinityOnJoin end [topVer=" + fut.topologyVersion() + ']');
     }
 
     /**
@@ -1034,6 +1076,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      * @return {@code True} if affinity should be assigned by coordinator.
      */
     public boolean onServerLeft(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
+        exchLog.info("onServerLeft start [topVer=" + fut.topologyVersion() + ']');
+
         ClusterNode leftNode = fut.discoveryEvent().eventNode();
 
         assert !leftNode.isClient() : leftNode;
@@ -1045,7 +1089,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 if (cacheCtx.isLocal())
                     continue;
 
+                exchLog.info("onServerLeft calc aff start [topVer=" + fut.topologyVersion() + ", cache=" + cacheCtx.name() + ']');
+
                 cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache());
+
+                exchLog.info("onServerLeft calc aff end [topVer=" + fut.topologyVersion() + ", cache=" + cacheCtx.name() + ']');
             }
 
             centralizedAff = true;
@@ -1062,6 +1110,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             this.waitInfo = null;
         }
 
+        exchLog.info("onServerLeft end [topVer=" + fut.topologyVersion() + ']');
+
         return centralizedAff;
     }
 
@@ -1231,6 +1281,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
 
         if (!crd) {
+            exchLog.info("initAffinityOnNodeJoin start [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']');
+
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                 if (cacheCtx.isLocal())
                     continue;
@@ -1240,9 +1292,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache);
             }
 
+            exchLog.info("initAffinityOnNodeJoin end [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']');
+
             return null;
         }
         else {
+            exchLog.info("initAffinityOnNodeJoin start [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']');
+
             final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
 
             forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
@@ -1255,6 +1311,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
                 }
             });
 
+            exchLog.info("initAffinityOnNodeJoin end [topVer=" + fut.topologyVersion() + ", crd=" + crd + ']');
+
             return waitRebalanceInfo;
         }
     }
@@ -1273,6 +1331,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         boolean latePrimary,
         Map<Object, List<List<ClusterNode>>> affCache)
         throws IgniteCheckedException {
+        exchLog.info("initAffinityOnNodeJoin start [topVer=" + fut.topologyVersion() +
+            ", cache=" + aff.cacheName() + ']');
+
         assert lateAffAssign;
 
         AffinityTopologyVersion topVer = fut.topologyVersion();
@@ -1318,6 +1379,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             newAssignment = idealAssignment;
 
         aff.initialize(fut.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
+
+        exchLog.info("initAffinityOnNodeJoin end [topVer=" + fut.topologyVersion() + ", cache=" +
+            aff.cacheName() + ']');
     }
 
     /**
@@ -1413,6 +1477,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
      */
     private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut)
         throws IgniteCheckedException {
+        exchLog.info("initAffinityOnNodeLeft start [topVer=" + fut.topologyVersion() + ']');
+
         final AffinityTopologyVersion topVer = fut.topologyVersion();
 
         final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
@@ -1531,6 +1597,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
             }
         }
 
+        exchLog.info("initAffinityOnNodeLeft end [topVer=" + fut.topologyVersion() + ']');
+
         return assignment;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 4dcff9b..0e4373c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -47,6 +47,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     /** */
     private boolean clientReconnect;
 
+    /** */
+    private boolean startCaches;
+
     /**
      * @param reqs Requests.
      */
@@ -113,6 +116,20 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
         return clientReconnect;
     }
 
+    /**
+     * @return {@code True} if required to start all caches on client node.
+     */
+    public boolean startCaches() {
+        return startCaches;
+    }
+
+    /**
+     * @param startCaches {@code True} if required to start all caches on client node.
+     */
+    public void startCaches(boolean startCaches) {
+        this.startCaches = startCaches;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DynamicCacheChangeBatch.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index ff7feb8..95cb452 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
@@ -118,7 +119,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.preloa
  */
 public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
     /** Exchange history size. */
-    private static final int EXCHANGE_HISTORY_SIZE = 1000;
+    private static final int EXCHANGE_HISTORY_SIZE =
+        IgniteSystemProperties.getInteger("IGNITE_EXCHANGE_HISTORY_SIZE", 1000);
 
     /** Atomic reference for pending timeout object. */
     private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
@@ -221,7 +223,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                     assert evt.type() != EVT_NODE_JOINED || n.order() > loc.order() :
                         "Node joined with smaller-than-local " +
-                            "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
+                        "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
 
                     exchId = exchangeId(n.id(),
                         affinityTopologyVersion(evt),
@@ -230,10 +232,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     exchFut = exchangeFuture(exchId, evt, cache,null, null);
                 }
                 else {
-                    DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt;
+                    DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage();
 
-                    if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
-                        DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();
+                    if (customMsg instanceof DynamicCacheChangeBatch) {
+                        DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
 
                         Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
 
@@ -266,8 +268,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             exchFut = exchangeFuture(exchId, evt, cache, valid, null);
                         }
                     }
-                    else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) {
-                        CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customEvt.customMessage();
+                    else if (customMsg instanceof CacheAffinityChangeMessage) {
+                        CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg;
 
                         if (msg.exchangeId() == null) {
                             if (msg.exchangeNeeded()) {
@@ -277,10 +279,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             }
                         }
                         else
-                            exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
+                            exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(evt.eventNode(), msg);
                     }
-                    else if (customEvt.customMessage() instanceof StartSnapshotOperationAckDiscoveryMessage
-                        && !((StartSnapshotOperationAckDiscoveryMessage)customEvt.customMessage()).hasError()) {
+                    else if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage
+                        && !((StartSnapshotOperationAckDiscoveryMessage)customMsg).hasError()) {
                         exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
 
                         exchFut = exchangeFuture(exchId, evt, null, null, null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 2bb1e42..8ad3c8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -43,6 +43,7 @@ import javax.management.JMException;
 import javax.management.MBeanServer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 import org.apache.ignite.cache.CacheExistsException;
 import org.apache.ignite.cache.CacheMode;
@@ -147,6 +148,8 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.internal.IgniteComponentType.JTA;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.affinityNode;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.clientNode;
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled;
 
 /**
@@ -154,6 +157,10 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearE
  */
 @SuppressWarnings({"unchecked", "TypeMayBeWeakened", "deprecation"})
 public class GridCacheProcessor extends GridProcessorAdapter {
+    /** */
+    private static final boolean startCaches =
+        IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN, false);
+
     /** Null cache name. */
     private static final String NULL_NAME = U.id8(UUID.randomUUID());
 
@@ -816,7 +823,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 boolean loc = desc.locallyConfigured();
 
-                if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) {
+                if (loc || (desc.receivedOnDiscovery() && (CU.affinityNode(locNode, filter) ||
+                    startAllCachesOnClientStart()))) {
                     boolean started = desc.onStart();
 
                     assert started : "Failed to change started flag for locally configured cache: " + desc;
@@ -2165,6 +2173,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         batch.clientReconnect(reconnect);
 
+        if (!reconnect)
+            batch.startCaches(startAllCachesOnClientStart());
+
         //todo check
         // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same.
         batch.id(null);
@@ -2172,6 +2183,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         return batch;
     }
 
+    /**
+     * @return {@code True} if need locally start all existing caches on client node start.
+     */
+    private boolean startAllCachesOnClientStart() {
+        return startCaches && ctx.clientNode();
+    }
+
     /** {@inheritDoc} */
     @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) {
         if (data instanceof DynamicCacheChangeBatch) {
@@ -2276,6 +2294,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                             ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
                     }
                 }
+
+                if (batch.startCaches()) {
+                    for (Map.Entry<String, DynamicCacheDescriptor> entry : registeredCaches.entrySet())
+                        ctx.discovery().addClientNode(entry.getKey(), joiningNodeId, false);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index a96cc43..5c5a3c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -34,6 +34,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@ -59,6 +60,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
  */
 @GridToStringExclude
 public class GridClientPartitionTopology implements GridDhtPartitionTopology {
+    /** */
+    private static final GridDhtPartitionState[] MOVING_STATES = new GridDhtPartitionState[] {MOVING};
+
     /** If true, then check consistency. */
     private static final boolean CONSISTENCY_CHECK = false;
 
@@ -81,7 +85,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     private GridDhtPartitionFullMap node2part;
 
     /** Partition to node map. */
-    private Map<Integer, Set<UUID>> part2node = new HashMap<>();
+    private final Map<Integer, Set<UUID>> part2node = new HashMap<>();
 
     /** */
     private GridDhtPartitionExchangeId lastExchangeId;
@@ -93,7 +97,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     private volatile boolean stopping;
 
     /** A future that will be completed when topology with version topVer will be ready to use. */
-    private GridDhtTopologyFuture topReadyFut;
+    private volatile GridDhtTopologyFuture topReadyFut;
 
     /** */
     private final GridAtomicLong updateSeq = new GridAtomicLong(1);
@@ -225,16 +229,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public GridDhtTopologyFuture topologyVersionFuture() {
-        lock.readLock().lock();
-
-        try {
-            assert topReadyFut != null;
+        assert topReadyFut != null;
 
-            return topReadyFut;
-        }
-        finally {
-            lock.readLock().unlock();
-        }
+        return topReadyFut;
     }
 
     /** {@inheritDoc} */
@@ -425,6 +422,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public List<ClusterNode> nodes(int p,
+        AffinityAssignment affAssignment,
+        List<ClusterNode> affNodes) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
         lock.readLock().lock();
 
@@ -503,7 +507,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) {
-        return nodes(p, topVer, OWNING);
+        return nodes(p, topVer, OWNING, null);
     }
 
     /** {@inheritDoc} */
@@ -513,7 +517,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public List<ClusterNode> moving(int p) {
-        return nodes(p, AffinityTopologyVersion.NONE, MOVING);
+        return nodes(p, AffinityTopologyVersion.NONE, MOVING, null);
     }
 
     /**
@@ -522,7 +526,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
      * @return List of nodes in state OWNING or MOVING.
      */
     private List<ClusterNode> ownersAndMoving(int p, AffinityTopologyVersion topVer) {
-        return nodes(p, topVer, OWNING, MOVING);
+        return nodes(p, topVer, OWNING, MOVING_STATES);
     }
 
     /** {@inheritDoc} */
@@ -564,9 +568,12 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionsExchangeFuture exchFut,
-        GridDhtPartitionFullMap partMap, Map<Integer, T2<Long, Long>> cntrMap, Set<Integer> partsToReload) {
-
+    @Nullable @Override public GridDhtPartitionMap2 update(
+        @Nullable GridDhtPartitionsExchangeFuture exchFut,
+        GridDhtPartitionFullMap partMap,
+        Map<Integer, T2<Long, Long>> cntrMap,
+        Set<Integer> partsToReload
+    ) {
         GridDhtPartitionExchangeId exchId = exchFut != null ? exchFut.exchangeId() : null;
 
         if (log.isDebugEnabled())
@@ -624,25 +631,26 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                 }
             }
 
-            node2part = partMap;
-
-            Map<Integer, Set<UUID>> p2n = new HashMap<>();
+            part2node.clear();
 
             for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) {
-                for (Integer p : e.getValue().keySet()) {
-                    Set<UUID> ids = p2n.get(p);
+                for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
+                    if (e0.getValue() != MOVING && e0.getValue() != OWNING)
+                        continue;
+
+                    int p = e0.getKey();
+
+                    Set<UUID> ids = part2node.get(p);
 
                     if (ids == null)
                         // Initialize HashSet to size 3 in anticipation that there won't be
                         // more than 3 nodes per partitions.
-                        p2n.put(p, ids = U.newHashSet(3));
+                        part2node.put(p, ids = U.newHashSet(3));
 
                     ids.add(e.getKey());
                 }
             }
 
-            part2node = p2n;
-
             if (cntrMap != null)
                 this.cntrMap = new HashMap<>(cntrMap);
 
@@ -660,9 +668,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+    @Nullable @Override public GridDhtPartitionMap2 update(
+        @Nullable GridDhtPartitionExchangeId exchId,
         GridDhtPartitionMap2 parts,
-        Map<Integer, T2<Long, Long>> cntrMap) {
+        Map<Integer, T2<Long, Long>> cntrMap
+    ) {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
 
@@ -716,18 +726,24 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
             node2part.put(parts.nodeId(), parts);
 
-            part2node = new HashMap<>(part2node);
-
             // Add new mappings.
-            for (Integer p : parts.keySet()) {
+            for (Map.Entry<Integer,GridDhtPartitionState> e : parts.entrySet()) {
+                int p = e.getKey();
+
                 Set<UUID> ids = part2node.get(p);
 
-                if (ids == null)
-                    // Initialize HashSet to size 3 in anticipation that there won't be
-                    // more than 3 nodes per partition.
-                    part2node.put(p, ids = U.newHashSet(3));
+                if (e.getValue() == MOVING || e.getValue() == OWNING) {
+                    if (ids == null)
+                        // Initialize HashSet to size 3 in anticipation that there won't be
+                        // more than 3 nodes per partition.
+                        part2node.put(p, ids = U.newHashSet(3));
 
-                changed |= ids.add(parts.nodeId());
+                    changed |= ids.add(parts.nodeId());
+                }
+                else {
+                    if (ids != null)
+                        changed |= ids.remove(parts.nodeId());
+                }
             }
 
             // Remove obsolete mappings.
@@ -862,8 +878,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             else
                 node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence());
 
-            part2node = new HashMap<>(part2node);
-
             GridDhtPartitionMap2 parts = node2part.remove(nodeId);
 
             if (parts != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 37c3af9..2bef267 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
@@ -191,6 +192,14 @@ public interface GridDhtPartitionTopology {
 
     /**
      * @param p Partition ID.
+     * @param affAssignment Assignments.
+     * @param affNodes Node assigned for given partition by affinity.
+     * @return Collection of all nodes responsible for this partition with primary node being first.
+     */
+    @Nullable public List<ClusterNode> nodes(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes);
+
+    /**
+     * @param p Partition ID.
      * @return Collection of all nodes who {@code own} this partition.
      */
     public List<ClusterNode> owners(int p);

http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 36c1ae5..fb09b38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -69,6 +69,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
  * Partition topology.
  */
 @GridToStringExclude class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
+    /** */
+    private static final GridDhtPartitionState[] MOVING_STATES = new GridDhtPartitionState[] {MOVING};
+
     /** If true, then check consistency. */
     private static final boolean CONSISTENCY_CHECK = false;
 
@@ -91,7 +94,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     private GridDhtPartitionFullMap node2part;
 
     /** Partition to node map. */
-    private Map<Integer, Set<UUID>> part2node = new HashMap<>();
+    private final Map<Integer, Set<UUID>> part2node;
 
     /** */
     private GridDhtPartitionExchangeId lastExchangeId;
@@ -106,7 +109,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     private volatile boolean stopping;
 
     /** A future that will be completed when topology with version topVer will be ready to use. */
-    private GridDhtTopologyFuture topReadyFut;
+    private volatile GridDhtTopologyFuture topReadyFut;
 
     /** */
     private final GridAtomicLong updateSeq = new GridAtomicLong(1);
@@ -139,6 +142,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         log = cctx.logger(getClass());
 
         locParts = new AtomicReferenceArray<>(cctx.config().getAffinity().partitions());
+
+        part2node = new HashMap<>(cctx.config().getAffinity().partitions(), 1.0f);
     }
 
     /** {@inheritDoc} */
@@ -155,7 +160,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         try {
             node2part = null;
 
-            part2node = new HashMap<>();
+            part2node.clear();
 
             lastExchangeId = null;
 
@@ -245,16 +250,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
     /** {@inheritDoc} */
     @Override public GridDhtTopologyFuture topologyVersionFuture() {
-        lock.readLock().lock();
-
-        try {
-            assert topReadyFut != null;
+        assert topReadyFut != null;
 
-            return topReadyFut;
-        }
-        finally {
-            lock.readLock().unlock();
-        }
+        return topReadyFut;
     }
 
     /** {@inheritDoc} */
@@ -753,6 +751,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             if (part != null && part.state().active())
                 list.add(part);
         }
+
         return list;
     }
 
@@ -827,11 +826,32 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public List<ClusterNode> nodes(int p,
+        AffinityAssignment affAssignment,
+        List<ClusterNode> affNodes) {
+        return nodes0(p, affAssignment, affNodes);
+    }
+
+    /** {@inheritDoc} */
     @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
         AffinityAssignment affAssignment = cctx.affinity().assignment(topVer);
 
         List<ClusterNode> affNodes = affAssignment.get(p);
 
+        List<ClusterNode> nodes = nodes0(p, affAssignment, affNodes);
+
+        return nodes != null ? nodes : affNodes;
+    }
+
+    /**
+     * @param p Partition.
+     * @param affAssignment Assignments.
+     * @param affNodes Node assigned for given partition by affinity.
+     * @return Nodes responsible for given partition (primary is first).
+     */
+    @Nullable private List<ClusterNode> nodes0(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes) {
+        AffinityTopologyVersion topVer = affAssignment.topologyVersion();
+
         lock.readLock().lock();
 
         try {
@@ -849,7 +869,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 for (UUID nodeId : nodeIds) {
                     HashSet<UUID> affIds = affAssignment.getIds(p);
 
-                    if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING, RENTING)) {
+                    if (!affIds.contains(nodeId) && hasState(p, nodeId, OWNING, MOVING)) {
                         ClusterNode n = cctx.discovery().node(nodeId);
 
                         if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) {
@@ -865,7 +885,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 }
             }
 
-            return nodes != null ? nodes : affNodes;
+            return nodes;
         }
         finally {
             lock.readLock().unlock();
@@ -927,7 +947,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         if (!cctx.rebalanceEnabled())
             return ownersAndMoving(p, topVer);
 
-        return nodes(p, topVer, OWNING);
+        return nodes(p, topVer, OWNING, null);
     }
 
     /** {@inheritDoc} */
@@ -940,7 +960,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
         if (!cctx.rebalanceEnabled())
             return ownersAndMoving(p, AffinityTopologyVersion.NONE);
 
-        return nodes(p, AffinityTopologyVersion.NONE, MOVING);
+        return nodes(p, AffinityTopologyVersion.NONE, MOVING, null);
     }
 
     /**
@@ -949,7 +969,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
      * @return List of nodes in state OWNING or MOVING.
      */
     private List<ClusterNode> ownersAndMoving(int p, AffinityTopologyVersion topVer) {
-        return nodes(p, topVer, OWNING, MOVING);
+        return nodes(p, topVer, OWNING, MOVING_STATES);
     }
 
     /** {@inheritDoc} */
@@ -980,7 +1000,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-    @Nullable @Override public GridDhtPartitionMap2 update(
+    @Override public GridDhtPartitionMap2 update(
         @Nullable GridDhtPartitionsExchangeFuture exchFut,
         GridDhtPartitionFullMap partMap,
         @Nullable Map<Integer, T2<Long, Long>> cntrMap,
@@ -1078,23 +1098,26 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             node2part = partMap;
 
-            Map<Integer, Set<UUID>> p2n = new HashMap<>(cctx.affinity().partitions(), 1.0f);
+            part2node.clear();
 
             for (Map.Entry<UUID, GridDhtPartitionMap2> e : partMap.entrySet()) {
-                for (Integer p : e.getValue().keySet()) {
-                    Set<UUID> ids = p2n.get(p);
+                for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
+                    if (e0.getValue() != MOVING && e0.getValue() != OWNING)
+                        continue;
+
+                    int p = e0.getKey();
+
+                    Set<UUID> ids = part2node.get(p);
 
                     if (ids == null)
                         // Initialize HashSet to size 3 in anticipation that there won't be
                         // more than 3 nodes per partitions.
-                        p2n.put(p, ids = U.newHashSet(3));
+                        part2node.put(p, ids = U.newHashSet(3));
 
                     ids.add(e.getKey());
                 }
             }
 
-            part2node = p2n;
-
             boolean changed = false;
 
             AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
@@ -1273,18 +1296,24 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
             node2part.put(parts.nodeId(), parts);
 
-            part2node = new HashMap<>(part2node);
-
             // Add new mappings.
-            for (Integer p : parts.keySet()) {
+            for (Map.Entry<Integer,GridDhtPartitionState> e : parts.entrySet()) {
+                int p = e.getKey();
+
                 Set<UUID> ids = part2node.get(p);
 
-                if (ids == null)
-                    // Initialize HashSet to size 3 in anticipation that there won't be
-                    // more than 3 nodes per partition.
-                    part2node.put(p, ids = U.newHashSet(3));
+                if (e.getValue() == MOVING || e.getValue() == OWNING) {
+                    if (ids == null)
+                        // Initialize HashSet to size 3 in anticipation that there won't be
+                        // more than 3 nodes per partition.
+                        part2node.put(p, ids = U.newHashSet(3));
 
-                changed |= ids.add(parts.nodeId());
+                    changed |= ids.add(parts.nodeId());
+                }
+                else {
+                    if (ids != null)
+                        changed |= ids.remove(parts.nodeId());
+                }
             }
 
             // Remove obsolete mappings.
@@ -1607,7 +1636,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 List<ClusterNode> affNodes = aff.get(p);
 
                 if (!affNodes.contains(cctx.localNode())) {
-                    List<ClusterNode> nodes = nodes(p, topVer, OWNING);
+                    List<ClusterNode> nodes = nodes(p, topVer, OWNING, null);
                     Collection<UUID> nodeIds = F.nodeIds(nodes);
 
                     // If all affinity nodes are owners, then evict partition from local node.
@@ -1740,12 +1769,12 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
         ClusterNode oldest = discoCache.oldestAliveServerNode();
 
-        assert oldest != null;
+        assert oldest != null || cctx.kernalContext().clientNode();
 
         ClusterNode loc = cctx.localNode();
 
         if (node2part != null) {
-            if (oldest.equals(loc) && !node2part.nodeId().equals(loc.id())) {
+            if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) {
                 updateSeq.setIfGreater(node2part.updateSequence());
 
                 node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq.incrementAndGet(),
@@ -1910,7 +1939,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
     /** {@inheritDoc} */
     @Override public void printMemoryStats(int threshold) {
-        X.println(">>>  Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']');
+        X.println(">>>  Cache partition topology stats [igniteInstanceName=" + cctx.gridName() +
+            ", cache=" + cctx.name() + ']');
 
         lock.readLock().lock();
 
@@ -1921,7 +1951,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                 if (part == null)
                     continue;
 
-                int size = part.size();
+                int size = part.dataStore().size();
 
                 if (size >= threshold)
                     X.println(">>>   Local partition [part=" + part.id() + ", size=" + size + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
index 9837f69..534abb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
@@ -21,13 +21,12 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.util.GridPartitionStateMap;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteProductVersion;
@@ -54,7 +53,7 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
     protected AffinityTopologyVersion top;
 
     /** */
-    protected Map<Integer, GridDhtPartitionState> map;
+    protected GridPartitionStateMap map;
 
     /** */
     private volatile int moving;
@@ -85,7 +84,8 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
         this.updateSeq = updateSeq;
         this.top = top;
 
-        map = U.newHashMap(m.size());
+        map = new GridPartitionStateMap(m.size());
+        //map = U.newHashMap(m.size());
 
         for (Map.Entry<Integer, GridDhtPartitionState> e : m.entrySet()) {
             GridDhtPartitionState state = e.getValue();
@@ -105,7 +105,7 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
     private GridDhtPartitionMap2(UUID nodeId,
         long updateSeq,
         AffinityTopologyVersion top,
-        Map<Integer, GridDhtPartitionState> map,
+        GridPartitionStateMap map,
         int moving) {
         this.nodeId = nodeId;
         this.updateSeq = updateSeq;
@@ -121,7 +121,7 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
         return new GridDhtPartitionMap2(nodeId,
             updateSeq,
             top,
-            U.<Integer, GridDhtPartitionState>newHashMap(0),
+            new GridPartitionStateMap(0),
             0);
     }
 
@@ -243,25 +243,7 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
 
         out.writeLong(updateSeq);
 
-        int size = map.size();
-
-        out.writeInt(size);
-
-        int i = 0;
-
-        for (Map.Entry<Integer, GridDhtPartitionState> entry : map.entrySet()) {
-            int ordinal = entry.getValue().ordinal();
-
-            assert ordinal == (ordinal & 0xFF);
-            assert entry.getKey() >= 0 && entry.getKey() <= CacheConfiguration.MAX_PARTITIONS_COUNT : entry.getKey();
-
-            out.write(ordinal);
-            out.writeShort((short)(int)entry.getKey());
-
-            i++;
-        }
-
-        assert i == size;
+        out.writeObject(map);
 
         if (top != null) {
             out.writeLong(topologyVersion().topologyVersion());
@@ -279,16 +261,13 @@ public class GridDhtPartitionMap2 implements Comparable<GridDhtPartitionMap2>, E
 
         updateSeq = in.readLong();
 
-        int size = in.readInt();
-
-        map = U.newHashMap(size);
-
-        for (int i = 0; i < size; i++) {
-            int ordinal = in.readByte() & 0xFF;
+        map = (GridPartitionStateMap)in.readObject();
 
-            int part = in.readShort() & 0xFFFF;
+        Set<Map.Entry<Integer, GridDhtPartitionState>> entries = map.entrySet();
 
-            put(part, GridDhtPartitionState.fromOrdinal(ordinal));
+        for (Map.Entry<Integer, GridDhtPartitionState> entry : entries) {
+            if (entry.getValue() == MOVING)
+                moving++;
         }
 
         long ver = in.readLong();

http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index b0d776f..078e67b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
@@ -73,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
+import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -110,6 +112,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
         IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);
 
+    public static final String EXCHANGE_LOG = "org.apache.ignite.internal.exchange.time";
+
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -189,6 +193,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /** Logger. */
     private final IgniteLogger log;
 
+    /** */
+    private final IgniteLogger exchLog;
+
     /** Dynamic cache change requests. */
     private Collection<DynamicCacheChangeRequest> reqs;
 
@@ -258,6 +265,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         this.cctx = cctx;
 
         log = cctx.logger(getClass());
+        exchLog = cctx.logger(EXCHANGE_LOG);
 
         onDone(exchId.topologyVersion());
     }
@@ -282,6 +290,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         this.forcedRebFut = forcedRebFut;
 
         log = cctx.logger(getClass());
+        exchLog = cctx.logger(EXCHANGE_LOG);
 
         reassign = true;
 
@@ -317,6 +326,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         this.affChangeMsg = affChangeMsg;
 
         log = cctx.logger(getClass());
+        exchLog = cctx.logger(EXCHANGE_LOG);
 
         initFut = new GridFutureAdapter<>();
 
@@ -535,6 +545,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             skipPreload = cctx.kernalContext().clientNode();
 
+            exchLog.info("Start exchange init [topVer=" + topVer +
+                ", crd=" + crdNode +
+                ", evt=" + discoEvt.type() +
+                ", customEvt=" + (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT ? ((DiscoveryCustomEvent)discoEvt).customMessage() : null) +
+                ']');
+
             ExchangeType exchange;
 
             if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
@@ -622,6 +638,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 default:
                     assert false;
             }
+
+            exchLog.info("Finish exchange init [topVer=" + topVer + ", crd=" + crdNode + ']');
         }
         catch (IgniteInterruptedCheckedException e) {
             onDone(e);
@@ -694,6 +712,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private void updateTopologies(boolean crd) throws IgniteCheckedException {
+        exchLog.info("updateTopologies start [topVer=" + topologyVersion() + ", crd=" + crd + ']');
+
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (cacheCtx.isLocal())
                 continue;
@@ -717,6 +737,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
             top.updateTopologyVersion(exchId, this, -1, stopping(top.cacheId()));
+
+        exchLog.info("updateTopologies end [topVer=" + topologyVersion() + ", crd=" + crd + ']');
     }
 
     /**
@@ -1126,6 +1148,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException {
+        exchLog.info("sendLocalPartitions start [topVer=" + topologyVersion() + ']');
+
         assert node != null;
 
         // Reset lost partition before send local partition to coordinator.
@@ -1162,6 +1186,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             if (log.isDebugEnabled())
                 log.debug("Node left during partition exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']');
         }
+
+        exchLog.info("sendLocalPartitions end [topVer=" + topologyVersion() + ']');
     }
 
     /**
@@ -1190,6 +1216,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException {
+        exchLog.info("sendAllPartitions start [topVer=" + topologyVersion() + ']');
+
         GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, true);
 
         assert !nodes.contains(cctx.localNode());
@@ -1199,6 +1227,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 ", exchId=" + exchId + ", msg=" + m + ']');
 
         cctx.io().safeSend(nodes, m, SYSTEM_POOL, null);
+
+        exchLog.info("sendAllPartitions end [topVer=" + topologyVersion() + ']');
     }
 
     /**
@@ -1313,6 +1343,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         cctx.database().releaseHistoryForExchange();
 
         if (super.onDone(res, err) && realExchange) {
+            exchLog.info("exchange finished [topVer=" + topologyVersion() +
+                ", time1=" + duration() +
+                ", time2=" + (U.currentTimeMillis() - initTs) + ']');
+
             if (log.isDebugEnabled())
                 log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this +
                     "duration=" + duration() + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
@@ -1526,6 +1560,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         }
     }
 
+    @GridToStringExclude
+    private GridAtomicLong maxTime = new GridAtomicLong();
+
     /**
      * @param node Sender node.
      * @param msg Message.
@@ -1534,6 +1571,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         boolean allReceived = false;
         boolean updateSingleMap = false;
 
+        long start = U.currentTimeMillis();
+
+        exchLog.info("processSingleMessage start [topVer=" + topologyVersion() +
+            ", fromId=" + node.id() +
+            ", fromOrder=" + node.order() +
+            ']');
+
         synchronized (mux) {
             assert crd != null;
 
@@ -1554,6 +1598,16 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         if (updateSingleMap)
             updatePartitionSingleMap(node, msg);
 
+        long time = U.currentTimeMillis() - start;
+
+        maxTime.setIfGreater(time);
+
+        exchLog.info("processSingleMessage end [topVer=" + topologyVersion() +
+            ", fromId=" + node.id() +
+            ", fromOrder=" + node.order() +
+            ", time=" + time +
+            ", maxTime=" + maxTime.get() + ']');
+
         if (allReceived)
             onAllReceived();
     }
@@ -1917,6 +1971,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         assert msg.exchangeId().equals(exchId) : msg;
         assert msg.lastVersion() != null : msg;
 
+        exchLog.info("processFullMessage start [topVer=" + topologyVersion() + ']');
+
         synchronized (mux) {
             if (crd == null)
                 return;
@@ -1938,6 +1994,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         if (exchangeOnChangeGlobalState && !F.isEmpty(msg.getExceptionsMap()))
             cctx.kernalContext().state().onFullResponseMessage(msg.getExceptionsMap());
 
+        exchLog.info("processFullMessage end [topVer=" + topologyVersion() + ']');
+
         onDone(exchId.topologyVersion());
     }
 
@@ -1978,7 +2036,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @param msg Partitions single message.
      */
     private void updatePartitionSingleMap(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
-        msgs.put(node.id(), msg);
+        if (cctx.database().persistenceEnabled())
+            msgs.put(node.id(), msg);
 
         for (Map.Entry<Integer, GridDhtPartitionMap2> entry : msg.partitions().entrySet()) {
             Integer cacheId = entry.getKey();

http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java
new file mode 100644
index 0000000..72f8469
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.io.Externalizable;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+
+/**
+ * Grid partition state map. States are encoded using bits.
+ * <p>
+ * Null values are prohibited.
+ */
+public class GridPartitionStateMap extends AbstractMap<Integer, GridDhtPartitionState> implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Required bits to hold all state. Additional zero state is required as well. */
+    private static final int BITS = Integer.SIZE -
+        Integer.numberOfLeadingZeros(GridDhtPartitionState.values().length + 1);
+
+    /** */
+    private final BitSet states;
+
+    /** */
+    private int size;
+
+    /** {@inheritDoc} */
+    @Override public Set<Entry<Integer, GridDhtPartitionState>> entrySet() {
+        return new AbstractSet<Entry<Integer, GridDhtPartitionState>>() {
+            @Override public Iterator<Entry<Integer, GridDhtPartitionState>> iterator() {
+                final int size = states.length() == 0 ? 0 : (states.length() - 1)/ BITS + 1;
+
+                return new Iterator<Entry<Integer, GridDhtPartitionState>>() {
+                    private int next;
+                    private int cur;
+
+                    @Override public boolean hasNext() {
+                        while(state(next) == null && next < size)
+                            next++;
+
+                        return next < size;
+                    }
+
+                    @Override public Entry<Integer, GridDhtPartitionState> next() {
+                        cur = next;
+                        next++;
+
+                        return new Entry<Integer, GridDhtPartitionState>() {
+                            int p = cur;
+
+                            @Override public Integer getKey() {
+                                return p;
+                            }
+
+                            @Override public GridDhtPartitionState getValue() {
+                                return state(p);
+                            }
+
+                            @Override public GridDhtPartitionState setValue(GridDhtPartitionState val) {
+                                return setState(p, val);
+                            }
+                        };
+                    }
+
+                    @Override public void remove() {
+                        setState(cur, null);
+                    }
+                };
+            }
+
+            @Override public int size() {
+                return GridPartitionStateMap.this.size();
+            }
+        };
+    }
+
+    /**
+     * Default constructor.
+     */
+    public GridPartitionStateMap() {
+        states = new BitSet();
+    }
+
+    /**
+     * @param parts Partitions to hold.
+     */
+    public GridPartitionStateMap(int parts) {
+        states = new BitSet(parts);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPartitionState put(Integer key, GridDhtPartitionState val) {
+        assert val != null;
+
+        return setState(key, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPartitionState get(Object key) {
+        return state((Integer)key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPartitionState remove(Object key) {
+        return setState((Integer)key, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean containsKey(Object key) {
+        return state((Integer)key) != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return size;
+    }
+
+    /** */
+    private GridDhtPartitionState setState(int part, GridDhtPartitionState st) {
+        GridDhtPartitionState old = state(part);
+
+        if (old == st)
+            return old;
+
+        int off = part * BITS;
+
+        int ist = st == null ? 0 : st.ordinal() + 1; // Reserve all zero bits for empty value
+
+        for (int i = 0; i < BITS; i++) {
+            states.set(off + i, (ist & 1) == 1);
+
+            ist >>>= 1;
+        }
+
+        size += (st == null ? -1 : old == null ? 1 : 0);
+
+        return old;
+    }
+
+    /** */
+    private GridDhtPartitionState state(int part) {
+        int off = part * BITS;
+
+        int st = 0;
+
+        for (int i = 0; i < BITS; i++)
+            st |= ((states.get(off + i) ? 1 : 0) << i);
+
+        return st == 0 ? null : GridDhtPartitionState.values()[st - 1];
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/52556f4b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index d812c21..47498dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2784,10 +2784,23 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         // Try to connect first on bound addresses.
         if (isRmtAddrsExist) {
-            List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
-
             boolean sameHost = U.sameMacs(getSpiContext().localNode(), node);
 
+            List<InetSocketAddress> addrs0;
+
+            Collection<InetSocketAddress> socketAddrs = U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort);
+
+            if (sameHost)
+                addrs0 = new ArrayList<>(socketAddrs);
+            else {
+                addrs0 = new ArrayList<>(socketAddrs.size());
+
+                for (InetSocketAddress addr0 : socketAddrs) {
+                    if (!addr0.getAddress().isLoopbackAddress())
+                        addrs0.add(addr0);
+                }
+            }
+
             Collections.sort(addrs0, U.inetAddressesComparator(sameHost));
 
             addrs = new LinkedHashSet<>(addrs0);


Mime
View raw message