ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [17/50] [abbrv] ignite git commit: ignite-324 Partition exchange: node should be assigned as primary only after preloading is finished Implemented 'late affinity assignment', also fixes: - fixed BinaryObject/BinaryReaderExImpl to properly handle case whe
Date Mon, 11 Apr 2016 14:25:44 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 2221d3b..01ca9f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -85,6 +85,9 @@ public class GridCacheSharedContext<K, V> {
     /** Deployment manager. */
     private GridCacheDeploymentManager<K, V> depMgr;
 
+    /** Affinity manager. */
+    private CacheAffinitySharedManager affMgr;
+
     /** Cache contexts map. */
     private ConcurrentMap<Integer, GridCacheContext<K, V>> ctxMap;
 
@@ -101,6 +104,7 @@ public class GridCacheSharedContext<K, V> {
      * @param mvccMgr MVCC manager.
      * @param depMgr Deployment manager.
      * @param exchMgr Exchange manager.
+     * @param affMgr Affinity manager.
      * @param ioMgr IO manager.
      * @param jtaMgr JTA manager.
      * @param storeSesLsnrs Store session listeners.
@@ -112,13 +116,14 @@ public class GridCacheSharedContext<K, V> {
         GridCacheMvccManager mvccMgr,
         GridCacheDeploymentManager<K, V> depMgr,
         GridCachePartitionExchangeManager<K, V> exchMgr,
+        CacheAffinitySharedManager<K, V> affMgr,
         GridCacheIoManager ioMgr,
         CacheJtaManagerAdapter jtaMgr,
         Collection<CacheStoreSessionListener> storeSesLsnrs
     ) {
         this.kernalCtx = kernalCtx;
 
-        setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, ioMgr);
+        setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, affMgr, ioMgr);
 
         this.storeSesLsnrs = storeSesLsnrs;
 
@@ -162,6 +167,7 @@ public class GridCacheSharedContext<K, V> {
             mvccMgr,
             new GridCacheDeploymentManager<K, V>(),
             new GridCachePartitionExchangeManager<K, V>(),
+            affMgr,
             ioMgr);
 
         this.mgrs = mgrs;
@@ -190,6 +196,7 @@ public class GridCacheSharedContext<K, V> {
      * @param mvccMgr MVCC manager.
      * @param depMgr Deployment manager.
      * @param exchMgr Exchange manager.
+     * @param affMgr Affinity manager.
      * @param ioMgr IO manager.
      * @param jtaMgr JTA manager.
      */
@@ -200,6 +207,7 @@ public class GridCacheSharedContext<K, V> {
         GridCacheMvccManager mvccMgr,
         GridCacheDeploymentManager<K, V> depMgr,
         GridCachePartitionExchangeManager<K, V> exchMgr,
+        CacheAffinitySharedManager affMgr,
         GridCacheIoManager ioMgr) {
         this.mvccMgr = add(mgrs, mvccMgr);
         this.verMgr = add(mgrs, verMgr);
@@ -207,6 +215,7 @@ public class GridCacheSharedContext<K, V> {
         this.jtaMgr = add(mgrs, jtaMgr);
         this.depMgr = add(mgrs, depMgr);
         this.exchMgr = add(mgrs, exchMgr);
+        this.affMgr = add(mgrs, affMgr);
         this.ioMgr = add(mgrs, ioMgr);
     }
 
@@ -366,6 +375,13 @@ public class GridCacheSharedContext<K, V> {
     }
 
     /**
+     * @return Affinity manager.
+     */
+    public CacheAffinitySharedManager<K, V> affinity() {
+        return affMgr;
+    }
+
+    /**
      * @return Lock order manager.
      */
     public GridCacheVersionManager versions() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 4744580..98b1b59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -17,10 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
-import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -28,14 +26,12 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.cache.Cache;
 import javax.cache.CacheException;
@@ -72,7 +68,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.processors.igfs.IgfsUtils;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.util.lang.IgniteInClosureX;
@@ -101,7 +96,6 @@ import org.apache.ignite.transactions.TransactionRollbackException;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -170,14 +164,6 @@ public class GridCacheUtils {
             }
         };
 
-    /** */
-    private static final IgniteClosure<Integer, GridCacheVersion[]> VER_ARR_FACTORY =
-        new C1<Integer, GridCacheVersion[]>() {
-            @Override public GridCacheVersion[] apply(Integer size) {
-                return new GridCacheVersion[size];
-            }
-        };
-
     /** Empty predicate array. */
     private static final IgnitePredicate[] EMPTY_FILTER = new IgnitePredicate[0];
 
@@ -302,46 +288,6 @@ public class GridCacheUtils {
     }
 
     /**
-     * Writes {@link GridCacheVersion} to output stream. This method is meant to be used by
-     * implementations of {@link Externalizable} interface.
-     *
-     * @param out Output stream.
-     * @param ver Version to write.
-     * @throws IOException If write failed.
-     */
-    public static void writeVersion(ObjectOutput out, GridCacheVersion ver) throws IOException {
-        // Write null flag.
-        out.writeBoolean(ver == null);
-
-        if (ver != null) {
-            out.writeBoolean(ver instanceof GridCacheVersionEx);
-
-            ver.writeExternal(out);
-        }
-    }
-
-    /**
-     * Reads {@link GridCacheVersion} from input stream. This method is meant to be used by
-     * implementations of {@link Externalizable} interface.
-     *
-     * @param in Input stream.
-     * @return Read version.
-     * @throws IOException If read failed.
-     */
-    @Nullable public static GridCacheVersion readVersion(ObjectInput in) throws IOException {
-        // If UUID is not null.
-        if (!in.readBoolean()) {
-            GridCacheVersion ver = in.readBoolean() ? new GridCacheVersionEx() : new GridCacheVersion();
-
-            ver.readExternal(in);
-
-            return ver;
-        }
-
-        return null;
-    }
-
-    /**
      * @param ctx Cache context.
      * @param meta Meta name.
      * @return Filter for entries with meta.
@@ -837,21 +783,6 @@ public class GridCacheUtils {
 
     /**
      * @param nodes Nodes.
-     * @param locId Local node ID.
-     * @return Local node if it is in the list of nodes, or primary node.
-     */
-    public static ClusterNode localOrPrimary(Iterable<ClusterNode> nodes, UUID locId) {
-        assert !F.isEmpty(nodes);
-
-        for (ClusterNode n : nodes)
-            if (n.id().equals(locId))
-                return n;
-
-        return F.first(nodes);
-    }
-
-    /**
-     * @param nodes Nodes.
      * @return Backup nodes.
      */
     public static Collection<ClusterNode> backups(Collection<ClusterNode> nodes) {
@@ -862,38 +793,6 @@ public class GridCacheUtils {
     }
 
     /**
-     * @param mappings Mappings.
-     * @param k map key.
-     * @return Either current list value or newly created one.
-     */
-    public static <K, V> Collection<V> getOrSet(Map<K, List<V>> mappings, K k) {
-        List<V> vals = mappings.get(k);
-
-        if (vals == null)
-            mappings.put(k, vals = new LinkedList<>());
-
-        return vals;
-    }
-
-    /**
-     * @param mappings Mappings.
-     * @param k map key.
-     * @return Either current list value or newly created one.
-     */
-    public static <K, V> Collection<V> getOrSet(ConcurrentMap<K, Collection<V>> mappings, K k) {
-        Collection<V> vals = mappings.get(k);
-
-        if (vals == null) {
-            Collection<V> old = mappings.putIfAbsent(k, vals = new ConcurrentLinkedDeque8<>());
-
-            if (old != null)
-                vals = old;
-        }
-
-        return vals;
-    }
-
-    /**
      * @param log Logger.
      * @param excl Excludes.
      * @return Future listener that logs errors.
@@ -923,20 +822,6 @@ public class GridCacheUtils {
 
     /**
      * @param t Exception to check.
-     * @return {@code true} if caused by lock timeout.
-     */
-    public static boolean isLockTimeout(Throwable t) {
-        if (t == null)
-            return false;
-
-        while (t instanceof IgniteCheckedException || t instanceof IgniteException)
-            t = t.getCause();
-
-        return t instanceof GridCacheLockTimeoutException;
-    }
-
-    /**
-     * @param t Exception to check.
      * @return {@code true} if caused by lock timeout or cancellation.
      */
     public static boolean isLockTimeoutOrCancelled(Throwable t) {
@@ -1129,13 +1014,6 @@ public class GridCacheUtils {
     }
 
     /**
-     * @return Version array factory.
-     */
-    public static IgniteClosure<Integer, GridCacheVersion[]> versionArrayFactory() {
-        return VER_ARR_FACTORY;
-    }
-
-    /**
      * Converts cache version to byte array.
      *
      * @param ver Version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index a9f4538..240fe7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -1773,6 +1773,11 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
     public IgniteInternalCache<K, V> withExpiryPolicy(ExpiryPolicy plc);
 
     /**
+     * @return Cache with no-retries behavior enabled.
+     */
+    public IgniteInternalCache<K, V> withNoRetries();
+
+    /**
      * @param key Key.
      * @param entryProcessor Entry processor.
      * @param args Arguments.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
index 6567141..9e85bad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
@@ -207,7 +207,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> {
             ClusterNode primary = cctx.affinity().primary(key, topVer);
 
             if (primary == null)
-                throw new IgniteException("Failed to get primare node [topVer=" + topVer + ", key=" + key + ']');
+                throw new IgniteException("Failed to get primary node [topVer=" + topVer + ", key=" + key + ']');
 
             Collection<K> mapped = res.get(primary);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index b42e5e7..c018f71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -230,7 +230,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
             if (create) {
                 hdr = new GridCacheQueueHeader(IgniteUuid.randomUuid(), cap, colloc, 0, 0, null);
 
-                GridCacheQueueHeader old = queueHdrView.getAndPutIfAbsent(key, hdr);
+                GridCacheQueueHeader old = queueHdrView.withNoRetries().getAndPutIfAbsent(key, hdr);
 
                 if (old != null) {
                     if (old.capacity() != cap || old.collocated() != colloc)
@@ -385,7 +385,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
 
             GridCacheSetHeader hdr;
 
-            GridCacheAdapter cache = cctx.cache();
+            IgniteInternalCache cache = cctx.cache().withNoRetries();
 
             if (create) {
                 hdr = new GridCacheSetHeader(IgniteUuid.randomUuid(), collocated);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
index 46d113b..875ada0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
  * Committed transaction information. Contains recovery writes that will be used to set commit values
  * in case if originating node crashes.
  */
+@Deprecated
 public class GridCacheCommittedTxInfo implements Externalizable {
     /** */
     private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 1709b0f..6e97ec5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -299,7 +299,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
         /** {@inheritDoc} */
         @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
             @Nullable Object arg) throws IgniteException {
-            Map<ComputeJob, ClusterNode> jobs = new HashMap();
+            Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
 
             for (ClusterNode node : subgrid)
                 jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore, keepBinary), node);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index a3eaba4..84a4094 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -156,14 +156,6 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage {
     }
 
     /**
-     * @param committedVers Committed versions relative to lock version.
-     * @param rolledbackVers Rolled back versions relative to lock version.
-     */
-    public void setCandidates(Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) {
-        completedVersions(committedVers, rolledbackVers);
-    }
-
-    /**
      * @param val Value.
      */
     public void addValue(CacheObject val) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 262d959..2cf7276 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -315,7 +314,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                     log.debug("Replacing obsolete entry in remote transaction [entry=" + entry + ", tx=" + this + ']');
 
                 // Replace the entry.
-                txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()));
+                txEntry.cached(txEntry.context().cache().entryEx(txEntry.key(), topologyVersion()));
             }
         }
     }
@@ -439,7 +438,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                         if (log.isDebugEnabled())
                             log.debug("Got removed entry while committing (will retry): " + txEntry);
 
-                        txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()));
+                        txEntry.cached(txEntry.context().cache().entryEx(txEntry.key(), topologyVersion()));
                     }
                 }
             }
@@ -469,7 +468,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                     GridCacheEntryEx cached = txEntry.cached();
 
                                     if (cached == null)
-                                        txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key()));
+                                        txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
 
                                     if (near() && cacheCtx.dr().receiveEnabled()) {
                                         cached.markObsolete(xidVer);
@@ -662,7 +661,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                         log.debug("Attempting to commit a removed entry (will retry): " + txEntry);
 
                                     // Renew cached entry.
-                                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()));
+                                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
                                 }
                             }
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 28c94dd..4381dfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -30,10 +29,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index ad4943e..3761d77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -219,7 +219,13 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
+    @Override public void initPartitions(GridDhtPartitionsExchangeFuture exchFut) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean initParts)
+        throws IgniteCheckedException {
         ClusterNode loc = cctx.localNode();
 
         U.writeLock(lock);
@@ -777,7 +783,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             node2part.put(nodeId, map = new GridDhtPartitionMap2(nodeId, updateSeq, topVer,
                 Collections.<Integer, GridDhtPartitionState>emptyMap(), false));
 
-        map.updateSequence(updateSeq);
+        map.updateSequence(updateSeq, topVer);
 
         map.put(p, state);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index 82450ad..e883614 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -18,10 +18,14 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -51,6 +55,20 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
     /** Affinity assignment bytes. */
     private byte[] affAssignmentBytes;
 
+    /** */
+    @GridDirectTransient
+    private List<List<UUID>> affAssignmentIds;
+
+    /** */
+    private byte[] affAssignmentIdsBytes;
+
+    /** */
+    @GridDirectTransient
+    private List<List<UUID>> idealAffAssignment;
+
+    /** Affinity assignment bytes. */
+    private byte[] idealAffAssignmentBytes;
+
     /**
      * Empty constructor.
      */
@@ -62,12 +80,19 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
      * @param cacheId Cache ID.
      * @param topVer Topology version.
      * @param affAssignment Affinity assignment.
+     * @param sndNodeIds If {@code true} sends only node IDs instead of nodes.
      */
-    public GridDhtAffinityAssignmentResponse(int cacheId, @NotNull AffinityTopologyVersion topVer,
-        List<List<ClusterNode>> affAssignment) {
+    public GridDhtAffinityAssignmentResponse(int cacheId,
+        @NotNull AffinityTopologyVersion topVer,
+        List<List<ClusterNode>> affAssignment,
+        boolean sndNodeIds) {
         this.cacheId = cacheId;
         this.topVer = topVer;
-        this.affAssignment = affAssignment;
+
+        if (!sndNodeIds)
+            this.affAssignment = affAssignment;
+        else
+            affAssignmentIds = ids(affAssignment);
     }
 
     /** {@inheritDoc} */
@@ -83,12 +108,86 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
     }
 
     /**
+     * @param disco Discovery manager.
      * @return Affinity assignment.
      */
-    public List<List<ClusterNode>> affinityAssignment() {
+    public List<List<ClusterNode>> affinityAssignment(GridDiscoveryManager disco) {
+        if (affAssignment != null)
+            return affAssignment;
+
+        if (affAssignmentIds != null)
+            affAssignment = nodes(disco, affAssignmentIds);
+
         return affAssignment;
     }
 
+    /**
+     * @return Ideal affinity assignment.
+     */
+    public List<List<ClusterNode>> idealAffinityAssignment(GridDiscoveryManager disco) {
+        return nodes(disco, idealAffAssignment);
+    }
+
+    /**
+     * @param disco Discovery manager.
+     * @param assignmentIds Assignment node IDs.
+     * @return Assignment nodes.
+     */
+    private List<List<ClusterNode>> nodes(GridDiscoveryManager disco, List<List<UUID>> assignmentIds) {
+        if (assignmentIds != null) {
+            List<List<ClusterNode>> assignment = new ArrayList<>(assignmentIds.size());
+
+            for (int i = 0; i < assignmentIds.size(); i++) {
+                List<UUID> ids = assignmentIds.get(i);
+                List<ClusterNode> nodes = new ArrayList<>(ids.size());
+
+                for (int j = 0; j < ids.size(); j++) {
+                    ClusterNode node = disco.node(topVer, ids.get(j));
+
+                    assert node != null;
+
+                    nodes.add(node);
+                }
+
+                assignment.add(nodes);
+            }
+
+            return assignment;
+        }
+
+        return null;
+    }
+
+    /**
+     * @param idealAffAssignment Ideal affinity assignment.
+     */
+    public void idealAffinityAssignment(List<List<ClusterNode>> idealAffAssignment) {
+        this.idealAffAssignment = ids(idealAffAssignment);
+    }
+
+    /**
+     * @param assignments Assignment.
+     */
+    private List<List<UUID>> ids(List<List<ClusterNode>> assignments) {
+        if (assignments != null) {
+            List<List<UUID>> assignment = new ArrayList<>(assignments.size());
+
+            for (int i = 0; i < assignments.size(); i++) {
+                List<ClusterNode> nodes = assignments.get(i);
+                List<UUID> ids = new ArrayList<>(nodes.size());
+
+                for (int j = 0; j < nodes.size(); j++)
+                    ids.add(nodes.get(j).id());
+
+                assignment.add(ids);
+            }
+
+            return assignment;
+        }
+
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Override public byte directType() {
         return 29;
@@ -96,7 +195,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 5;
+        return 7;
     }
 
     /**
@@ -105,34 +204,69 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
+        assert affAssignment != null ^ affAssignmentIds != null;
+
         if (affAssignment != null && affAssignmentBytes == null)
             affAssignmentBytes = ctx.marshaller().marshal(affAssignment);
+
+        if (affAssignmentIds != null && affAssignmentIdsBytes == null)
+            affAssignmentIdsBytes = ctx.marshaller().marshal(affAssignmentIds);
+
+        if (idealAffAssignment != null && idealAffAssignmentBytes == null)
+            idealAffAssignmentBytes = ctx.marshaller().marshal(idealAffAssignment);
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (affAssignmentBytes != null && affAssignment == null) {
-            affAssignment = ctx.marshaller().unmarshal(affAssignmentBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        assert affAssignmentBytes != null ^ affAssignmentIdsBytes != null;
 
-            // TODO IGNITE-2110: setting 'local' for nodes not needed when IGNITE-2110 is implemented.
-            int assignments = affAssignment.size();
+        ldr = U.resolveClassLoader(ldr, ctx.gridConfig());
 
-            for (int n = 0; n < assignments; n++) {
-                List<ClusterNode> nodes = affAssignment.get(n);
+        if (affAssignmentBytes != null && affAssignment == null)
+            affAssignment = unmarshalNodes(affAssignmentBytes, ctx, ldr);
 
-                int size = nodes.size();
+        if (affAssignmentIdsBytes != null && affAssignmentIds == null)
+            affAssignmentIds = ctx.marshaller().unmarshal(affAssignmentIdsBytes, ldr);
 
-                for (int i = 0; i < size; i++) {
-                    ClusterNode node = nodes.get(i);
+        if (idealAffAssignmentBytes != null && idealAffAssignment == null)
+            idealAffAssignment = ctx.marshaller().unmarshal(idealAffAssignmentBytes, ldr);
+    }
 
-                    if (node instanceof TcpDiscoveryNode)
-                        ((TcpDiscoveryNode)node).local(node.id().equals(ctx.localNodeId()));
-                }
+    /**
+     * @param bytes Assignment bytes.
+     * @param ctx Context.
+     * @param ldr Class loader.
+     * @return Assignment.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private List<List<ClusterNode>> unmarshalNodes(byte[] bytes,
+        GridCacheSharedContext ctx,
+        ClassLoader ldr)
+        throws IgniteCheckedException
+    {
+        List<List<ClusterNode>> affAssignment = ctx.marshaller().unmarshal(bytes,
+            U.resolveClassLoader(ldr, ctx.gridConfig()));
+
+        // TODO IGNITE-2110: setting 'local' for nodes not needed when IGNITE-2110 is implemented.
+        int assignments = affAssignment.size();
+
+        for (int n = 0; n < assignments; n++) {
+            List<ClusterNode> nodes = affAssignment.get(n);
+
+            int size = nodes.size();
+
+            for (int i = 0; i < size; i++) {
+                ClusterNode node = nodes.get(i);
+
+                if (node instanceof TcpDiscoveryNode)
+                    ((TcpDiscoveryNode)node).local(node.id().equals(ctx.localNodeId()));
             }
         }
+
+        return affAssignment;
     }
 
     /** {@inheritDoc} */
@@ -162,6 +296,18 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
                 writer.incrementState();
 
             case 4:
+                if (!writer.writeByteArray("affAssignmentIdsBytes", affAssignmentIdsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeByteArray("idealAffAssignmentBytes", idealAffAssignmentBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -192,6 +338,22 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
                 reader.incrementState();
 
             case 4:
+                affAssignmentIdsBytes = reader.readByteArray("affAssignmentIdsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                idealAffAssignmentBytes = reader.readByteArray("idealAffAssignmentBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index bb9f4ab..ab8e863 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -20,9 +20,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -30,9 +30,12 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridNodeOrderComparator;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
@@ -41,7 +44,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFF
 /**
  * Future that fetches affinity assignment from remote cache nodes.
  */
-public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<List<List<ClusterNode>>> {
+public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffinityAssignmentResponse> {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -51,34 +54,39 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<List<List<Cl
     /** Logger. */
     private static IgniteLogger log;
 
-    /** Cache context. */
-    private final GridCacheContext ctx;
+    /** */
+    private final GridCacheSharedContext ctx;
 
     /** List of available nodes this future can fetch data from. */
+    @GridToStringInclude
     private Queue<ClusterNode> availableNodes;
 
-    /** Topology version. */
-    private final AffinityTopologyVersion topVer;
-
     /** Pending node from which response is being awaited. */
     private ClusterNode pendingNode;
 
+    /** */
+    @GridToStringInclude
+    private final T2<Integer, AffinityTopologyVersion> key;
+
     /**
-     * @param ctx Cache context.
-     * @param availableNodes Available nodes.
+     * @param ctx Context.
+     * @param cacheName Cache name.
+     * @param topVer Topology version.
      */
     public GridDhtAssignmentFetchFuture(
-        GridCacheContext ctx,
-        AffinityTopologyVersion topVer,
-        Collection<ClusterNode> availableNodes
+        GridCacheSharedContext ctx,
+        String cacheName,
+        AffinityTopologyVersion topVer
     ) {
         this.ctx = ctx;
-        this.topVer = topVer;
+        this.key = new T2<>(CU.cacheId(cacheName), topVer);
+
+        Collection<ClusterNode> availableNodes = ctx.discovery().cacheAffinityNodes(cacheName, topVer);
 
         LinkedList<ClusterNode> tmp = new LinkedList<>();
 
         for (ClusterNode node : availableNodes) {
-            if (!node.isLocal())
+            if (!node.isLocal() && ctx.discovery().alive(node))
                 tmp.add(node);
         }
 
@@ -94,33 +102,40 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<List<List<Cl
      * Initializes fetch future.
      */
     public void init() {
-        ((GridDhtPreloader)ctx.preloader()).addDhtAssignmentFetchFuture(topVer, this);
+        ctx.affinity().addDhtAssignmentFetchFuture(this);
 
         requestFromNextNode();
     }
 
     /**
-     * @param node Node.
+     * @return Future key.
+     */
+    public T2<Integer, AffinityTopologyVersion> key() {
+        return key;
+    }
+
+    /**
+     * @param nodeId Node ID.
      * @param res Response.
      */
-    public void onResponse(ClusterNode node, GridDhtAffinityAssignmentResponse res) {
-        if (!res.topologyVersion().equals(topVer)) {
+    public void onResponse(UUID nodeId, GridDhtAffinityAssignmentResponse res) {
+        if (!res.topologyVersion().equals(key.get2())) {
             if (log.isDebugEnabled())
                 log.debug("Received affinity assignment for wrong topology version (will ignore) " +
-                    "[node=" + node + ", res=" + res + ", topVer=" + topVer + ']');
+                    "[node=" + nodeId + ", res=" + res + ", topVer=" + key.get2() + ']');
 
             return;
         }
 
-        List<List<ClusterNode>> assignment = null;
+        GridDhtAffinityAssignmentResponse res0 = null;
 
         synchronized (this) {
-            if (pendingNode != null && pendingNode.equals(node))
-                assignment = res.affinityAssignment();
+            if (pendingNode != null && pendingNode.id().equals(nodeId))
+                res0 = res;
         }
 
-        if (assignment != null)
-            onDone(assignment);
+        if (res0 != null)
+            onDone(res);
     }
 
     /**
@@ -139,9 +154,9 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<List<List<Cl
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(@Nullable List<List<ClusterNode>> res, @Nullable Throwable err) {
+    @Override public boolean onDone(@Nullable GridDhtAffinityAssignmentResponse res, @Nullable Throwable err) {
         if (super.onDone(res, err)) {
-            ((GridDhtPreloader)ctx.preloader()).removeDhtAssignmentFetchFuture(topVer, this);
+            ctx.affinity().removeDhtAssignmentFetchFuture(this);
 
             return true;
         }
@@ -167,7 +182,7 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<List<List<Cl
                         log0.debug("Sending affinity fetch request to remote node [locNodeId=" + ctx.localNodeId() +
                             ", node=" + node + ']');
 
-                    ctx.io().send(node, new GridDhtAffinityAssignmentRequest(ctx.cacheId(), topVer),
+                    ctx.io().send(node, new GridDhtAffinityAssignmentRequest(key.get1(), key.get2()),
                         AFFINITY_POOL);
 
                     // Close window for listener notification.
@@ -198,6 +213,11 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<List<List<Cl
         // No more nodes left, complete future with null outside of synchronization.
         // Affinity should be calculated from scratch.
         if (complete)
-            onDone((List<List<ClusterNode>>)null);
+            onDone((GridDhtAffinityAssignmentResponse)null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtAssignmentFetchFuture.class, this);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index ee9525a..faa980e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -35,6 +35,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
@@ -427,31 +428,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     }
 
     /**
-     * Gets or creates entry for given key. If key belongs to local node, dht entry will be returned, otherwise
-     * if {@code allowDetached} is {@code true}, detached entry will be returned, otherwise exception will be
-     * thrown.
-     *
-     * @param key Key for which entry should be returned.
-     * @param allowDetached Whether to allow detached entries.
-     * @param touch {@code True} if entry should be passed to eviction policy.
-     * @return Cache entry.
-     * @throws GridDhtInvalidPartitionException if entry does not belong to this node and
-     *      {@code allowDetached} is {@code false}.
-     */
-    public GridCacheEntryEx entryExx(KeyCacheObject key, AffinityTopologyVersion topVer, boolean allowDetached, boolean touch) {
-        try {
-            return allowDetached && !ctx.affinity().localNode(key, topVer) ?
-                createEntry(key) : entryEx(key, touch);
-        }
-        catch (GridDhtInvalidPartitionException e) {
-            if (!allowDetached)
-                throw e;
-
-            return createEntry(key);
-        }
-    }
-
-    /**
      * @param key Key for which entry should be returned.
      * @return Cache entry.
      */
@@ -934,7 +910,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
                     Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new HashMap<>();
 
-                    AffinityTopologyVersion topVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion());
+                    AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion();
 
                     for (Map.Entry<KeyCacheObject, GridCacheVersion> e : entries.entrySet()) {
                         List<ClusterNode> nodes = ctx.affinity().nodes(e.getKey(), topVer);
@@ -984,7 +960,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                             ctx.io().send(req.getKey(), req.getValue(), ctx.ioPolicy());
                         }
                         catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to send TTL update request.", e);
+                            if (e instanceof ClusterTopologyCheckedException) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to send TTC update request, node left: " + req.getKey());
+                            }
+                            else
+                                U.error(log, "Failed to send TTL update request.", e);
                         }
                     }
                 }
@@ -1050,6 +1031,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                             if (log.isDebugEnabled())
                                 log.debug("Got removed entry: " + entry);
                         }
+                        catch (GridDhtInvalidPartitionException e) {
+                            if (log.isDebugEnabled())
+                                log.debug("Got GridDhtInvalidPartitionException: " + e);
+
+                            break;
+                        }
                     }
                 }
                 finally {
@@ -1153,7 +1140,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         /** {@inheritDoc} */
         @Override public int size() {
             GridDhtLocalPartition part = ctx.topology().localPartition(partId,
-                new AffinityTopologyVersion(ctx.discovery().topologyVersion()), false);
+                ctx.discovery().topologyVersionEx(), false);
 
             return part != null ? part.publicSize() : 0;
         }
@@ -1201,7 +1188,18 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer);
         Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer);
 
-        return !cacheNodes0.equals(cacheNodes1);
+        if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0)
+            return true;
+
+        try {
+            List<List<ClusterNode>> aff1 = ctx.affinity().assignments(expVer);
+            List<List<ClusterNode>> aff2 = ctx.affinity().assignments(curVer);
+
+            return !aff1.equals(aff2);
+        }
+        catch (IllegalStateException e) {
+            return true;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index fa753b0..fbfca82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -406,6 +406,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
             }
             else {
                 fut = tx.getAllAsync(cctx,
+                    null,
                     keys.keySet(),
                     /*deserialize binary*/false,
                     skipVals,
@@ -437,6 +438,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
                         }
                         else {
                             return tx.getAllAsync(cctx,
+                                null,
                                 keys.keySet(),
                                 /*deserialize binary*/false,
                                 skipVals,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index d9851c7..2de92b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -356,6 +356,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
             }
             else {
                 fut = tx.getAllAsync(cctx,
+                    null,
                     Collections.singleton(key),
                     /*deserialize binary*/false,
                     skipVals,
@@ -390,6 +391,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
                         }
                         else {
                             fut0 = tx.getAllAsync(cctx,
+                                null,
                                 Collections.singleton(key),
                                 /*deserialize binary*/false,
                                 skipVals,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index a33f01f..a3e94a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -303,13 +303,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
         return futId;
     }
 
-    /**
-     * @return Near lock version.
-     */
-    public GridCacheVersion nearLockVersion() {
-        return nearLockVer;
-    }
-
     /** {@inheritDoc} */
     @Nullable @Override public GridCacheVersion mappedVersion() {
         return tx == null ? nearLockVer : null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 84889f8..7fba45d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -87,9 +87,17 @@ public interface GridDhtPartitionTopology {
      * Pre-initializes this topology.
      *
      * @param exchFut Exchange future.
+     * @param affReady Affinity ready flag.
      * @throws IgniteCheckedException If failed.
      */
-    public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException;
+    public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean affReady)
+        throws IgniteCheckedException;
+
+    /**
+     * @param exchFut Exchange future.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    public void initPartitions(GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException;
 
     /**
      * Post-initializes this topology.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index b3786cd..f0ce6d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -267,13 +267,144 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
-        waitForRent();
+    @Override public void initPartitions(GridDhtPartitionsExchangeFuture exchFut) throws IgniteInterruptedCheckedException {
+        U.writeLock(lock);
+
+        try {
+            if (stopping)
+                return;
+
+            long updateSeq = this.updateSeq.incrementAndGet();
+
+            initPartitions0(exchFut, updateSeq);
+
+            consistencyCheck();
+        }
+        finally {
+            lock.writeLock().unlock();
+        }
+    }
 
+    /**
+     * @param exchFut Exchange future.
+     * @param updateSeq Update sequence.
+     */
+    private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
         ClusterNode loc = cctx.localNode();
 
+        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+        assert oldest != null || cctx.kernalContext().clientNode();
+
+        GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+
+        assert topVer.equals(exchFut.topologyVersion()) :
+            "Invalid topology [topVer=" + topVer +
+            ", cache=" + cctx.name() +
+            ", futVer=" + exchFut.topologyVersion() + ']';
+        assert cctx.affinity().affinityTopologyVersion().equals(exchFut.topologyVersion()) :
+            "Invalid affinity [topVer=" + cctx.affinity().affinityTopologyVersion() +
+            ", cache=" + cctx.name()+
+            ", futVer=" + exchFut.topologyVersion() + ']';
+
+        List<List<ClusterNode>> aff = cctx.affinity().assignments(exchFut.topologyVersion());
+
         int num = cctx.affinity().partitions();
 
+        if (cctx.rebalanceEnabled()) {
+            boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion());
+
+            boolean first = (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()) || added;
+
+            if (first) {
+                assert exchId.isJoined() || added;
+
+                for (int p = 0; p < num; p++) {
+                    if (localNode(p, aff)) {
+                        GridDhtLocalPartition locPart = createPartition(p);
+
+                        boolean owned = locPart.own();
+
+                        assert owned : "Failed to own partition for oldest node [cacheName" + cctx.name() +
+                            ", part=" + locPart + ']';
+
+                        if (log.isDebugEnabled())
+                            log.debug("Owned partition for oldest node: " + locPart);
+
+                        updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                    }
+                }
+            }
+            else
+                createPartitions(aff, updateSeq);
+        }
+        else {
+            // If preloader is disabled, then we simply clear out
+            // the partitions this node is not responsible for.
+            for (int p = 0; p < num; p++) {
+                GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
+
+                boolean belongs = localNode(p, aff);
+
+                if (locPart != null) {
+                    if (!belongs) {
+                        GridDhtPartitionState state = locPart.state();
+
+                        if (state.active()) {
+                            locPart.rent(false);
+
+                            updateLocal(p, loc.id(), locPart.state(), updateSeq);
+
+                            if (log.isDebugEnabled())
+                                log.debug("Evicting partition with rebalancing disabled " +
+                                    "(it does not belong to affinity): " + locPart);
+                        }
+                    }
+                }
+                else if (belongs)
+                    createPartition(p);
+            }
+        }
+
+        if (node2part != null && node2part.valid())
+            checkEvictions(updateSeq, aff);
+
+        updateRebalanceVersion(aff);
+    }
+
+    /**
+     * @param aff Affinity assignments.
+     * @param updateSeq Update sequence.
+     */
+    private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) {
+        ClusterNode loc = cctx.localNode();
+
+        int num = cctx.affinity().partitions();
+
+        for (int p = 0; p < num; p++) {
+            if (node2part != null && node2part.valid()) {
+                if (localNode(p, aff)) {
+                    // This will make sure that all non-existing partitions
+                    // will be created in MOVING state.
+                    GridDhtLocalPartition locPart = createPartition(p);
+
+                    updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                }
+            }
+            // If this node's map is empty, we pre-create local partitions,
+            // so local map will be sent correctly during exchange.
+            else if (localNode(p, aff))
+                createPartition(p);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, boolean affReady)
+        throws IgniteCheckedException {
+        waitForRent();
+
+        ClusterNode loc = cctx.localNode();
+
         U.writeLock(lock);
 
         try {
@@ -291,7 +422,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             // In case if node joins, get topology at the time of joining node.
             ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
 
-            assert oldest != null;
+            assert oldest != null || cctx.kernalContext().clientNode();
 
             if (log.isDebugEnabled())
                 log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
@@ -301,7 +432,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             cntrMap.clear();
 
             // If this is the oldest node.
-            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) {
+            if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()))) {
                 if (node2part == null) {
                     node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 
@@ -325,110 +456,14 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
-            if (cctx.rebalanceEnabled()) {
-                for (int p = 0; p < num; p++) {
-                    // If this is the first node in grid.
-                    boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion());
-
-                    if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId()) && exchId.isJoined()) || added) {
-                        assert exchId.isJoined() || added;
-
-                        try {
-                            GridDhtLocalPartition locPart = localPartition(p, topVer, true, false);
-
-                            assert locPart != null;
-
-                            boolean owned = locPart.own();
-
-                            assert owned : "Failed to own partition for oldest node [cacheName" + cctx.name() +
-                                ", part=" + locPart + ']';
-
-                            if (log.isDebugEnabled())
-                                log.debug("Owned partition for oldest node: " + locPart);
-
-                            updateLocal(p, loc.id(), locPart.state(), updateSeq);
-                        }
-                        catch (GridDhtInvalidPartitionException e) {
-                            if (log.isDebugEnabled())
-                                log.debug("Ignoring invalid partition on oldest node (no need to create a partition " +
-                                    "if it no longer belongs to local node: " + e.partition());
-                        }
-                    }
-                    // If this is not the first node in grid.
-                    else {
-                        if (node2part != null && node2part.valid()) {
-                            if (cctx.affinity().localNode(p, topVer)) {
-                                try {
-                                    // This will make sure that all non-existing partitions
-                                    // will be created in MOVING state.
-                                    GridDhtLocalPartition locPart = localPartition(p, topVer, true, false);
-
-                                    updateLocal(p, loc.id(), locPart.state(), updateSeq);
-                                }
-                                catch (GridDhtInvalidPartitionException e) {
-                                    if (log.isDebugEnabled())
-                                        log.debug("Ignoring invalid partition (no need to create a partition if it " +
-                                            "no longer belongs to local node: " + e.partition());
-                                }
-                            }
-                        }
-                        // If this node's map is empty, we pre-create local partitions,
-                        // so local map will be sent correctly during exchange.
-                        else if (cctx.affinity().localNode(p, topVer)) {
-                            try {
-                                localPartition(p, topVer, true, false);
-                            }
-                            catch (GridDhtInvalidPartitionException e) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Ignoring invalid partition (no need to pre-create a partition if it " +
-                                        "no longer belongs to local node: " + e.partition());
-                            }
-                        }
-                    }
-                }
-            }
+            if (affReady)
+                initPartitions0(exchFut, updateSeq);
             else {
-                // If preloader is disabled, then we simply clear out
-                // the partitions this node is not responsible for.
-                for (int p = 0; p < num; p++) {
-                    GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
-
-                    boolean belongs = cctx.affinity().localNode(p, topVer);
+                List<List<ClusterNode>> aff = cctx.affinity().idealAssignment();
 
-                    if (locPart != null) {
-                        if (!belongs) {
-                            GridDhtPartitionState state = locPart.state();
-
-                            if (state.active()) {
-                                locPart.rent(false);
-
-                                updateLocal(p, loc.id(), locPart.state(), updateSeq);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Evicting partition with rebalancing disabled " +
-                                        "(it does not belong to affinity): " + locPart);
-                            }
-                        }
-                    }
-                    else if (belongs) {
-                        try {
-                            // Pre-create partitions.
-                            localPartition(p, topVer, true, false);
-                        }
-                        catch (GridDhtInvalidPartitionException e) {
-                            if (log.isDebugEnabled())
-                                log.debug("Ignoring invalid partition with disabled rebalancer (no need to " +
-                                    "pre-create a partition if it no longer belongs to local node: " + e.partition());
-                        }
-                    }
-                }
+                createPartitions(aff, updateSeq);
             }
 
-            if (node2part != null && node2part.valid())
-                checkEvictions(updateSeq);
-
-            updateRebalanceVersion();
-
             consistencyCheck();
 
             if (log.isDebugEnabled())
@@ -453,6 +488,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
         AffinityTopologyVersion topVer = exchFut.topologyVersion();
 
+        assert cctx.affinity().affinityTopologyVersion().equals(topVer) : "Affinity is not initialized " +
+            "[topVer=" + topVer +
+            ", affVer=" + cctx.affinity().affinityTopologyVersion() +
+            ", fut=" + exchFut + ']';
+
         lock.writeLock().lock();
 
         try {
@@ -535,7 +575,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
-            updateRebalanceVersion();
+            updateRebalanceVersion(cctx.affinity().assignments(topVer));
 
             consistencyCheck();
         }
@@ -554,6 +594,30 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /**
      * @param p Partition number.
+     * @return Partition.
+     */
+    private GridDhtLocalPartition createPartition(int p) {
+        GridDhtLocalPartition loc = locParts.get(p);
+
+        if (loc != null && loc.state() == EVICTED) {
+            boolean rmv = locParts.remove(p, loc);
+
+            assert rmv;
+
+            loc = null;
+        }
+
+        if (loc == null) {
+            GridDhtLocalPartition old = locParts.putIfAbsent(p, loc = new GridDhtLocalPartition(cctx, p));
+
+            assert old == null : old;
+        }
+
+        return loc;
+    }
+
+    /**
+     * @param p Partition number.
      * @param topVer Topology version.
      * @param create Create flag.
      * @param updateSeq Update sequence.
@@ -714,6 +778,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         try {
             assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer +
                 ", topVer2=" + this.topVer +
+                ", node=" + cctx.gridName() +
                 ", cache=" + cctx.name() +
                 ", node2part=" + node2part + ']';
 
@@ -959,9 +1024,17 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             part2node = p2n;
 
-            boolean changed = checkEvictions(updateSeq);
+            boolean changed = false;
+
+            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
 
-            updateRebalanceVersion();
+            if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
+                List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+
+                changed = checkEvictions(updateSeq, aff);
+
+                updateRebalanceVersion(aff);
+            }
 
             consistencyCheck();
 
@@ -978,7 +1051,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap2 parts, @Nullable Map<Integer, Long> cntrMap) {
+        GridDhtPartitionMap2 parts,
+        @Nullable Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
 
@@ -1072,9 +1146,15 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
-            changed |= checkEvictions(updateSeq);
+            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+
+            if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
+                List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
 
-            updateRebalanceVersion();
+                changed |= checkEvictions(updateSeq, aff);
+
+                updateRebalanceVersion(aff);
+            }
 
             consistencyCheck();
 
@@ -1090,9 +1170,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
     /**
      * @param updateSeq Update sequence.
+     * @param aff Affinity assignments.
      * @return Checks if any of the local partitions need to be evicted.
      */
-    private boolean checkEvictions(long updateSeq) {
+    private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) {
         boolean changed = false;
 
         UUID locId = cctx.nodeId();
@@ -1103,7 +1184,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (state.active()) {
                 int p = part.id();
 
-                List<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
+                List<ClusterNode> affNodes = aff.get(p);
 
                 if (!affNodes.contains(cctx.localNode())) {
                     Collection<UUID> nodeIds = F.nodeIds(nodes(p, topVer, OWNING));
@@ -1172,10 +1253,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         // In case if node joins, get topology at the time of joining node.
         ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
 
-        assert oldest != null;
+        assert oldest != null || cctx.kernalContext().clientNode();
 
         // If this node became the oldest node.
-        if (oldest.id().equals(cctx.nodeId())) {
+        if (cctx.localNode().equals(oldest)) {
             long seq = node2part.updateSequence();
 
             if (seq != updateSeq) {
@@ -1203,7 +1284,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             node2part.put(nodeId, map = new GridDhtPartitionMap2(nodeId, updateSeq, topVer,
                 Collections.<Integer, GridDhtPartitionState>emptyMap(), false));
 
-        map.updateSequence(updateSeq);
+        map.updateSequence(updateSeq, topVer);
 
         map.put(p, state);
 
@@ -1221,7 +1302,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     private void removeNode(UUID nodeId) {
         assert nodeId != null;
 
-        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+        ClusterNode oldest = CU.oldest(cctx.discovery().serverNodes(topVer));
 
         assert oldest != null;
 
@@ -1357,15 +1438,24 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /**
-     *
+     * @param part Partition.
+     * @param aff Affinity assignments.
+     * @return {@code True} if given partition belongs to local node.
+     */
+    private boolean localNode(int part, List<List<ClusterNode>> aff) {
+        return aff.get(part).contains(cctx.localNode());
+    }
+
+    /**
+     * @param aff Affinity assignments.
      */
-    private void updateRebalanceVersion() {
+    private void updateRebalanceVersion(List<List<ClusterNode>> aff) {
         if (!rebalancedTopVer.equals(topVer)) {
             if (node2part == null || !node2part.valid())
                 return;
 
             for (int i = 0; i < cctx.affinity().partitions(); i++) {
-                List<ClusterNode> affNodes = cctx.affinity().nodes(i, topVer);
+                List<ClusterNode> affNodes = aff.get(i);
 
                 // Topology doesn't contain server nodes (just clients).
                 if (affNodes.isEmpty())

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index ae24ed1..b6639f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -867,6 +867,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
 
                         tx = new GridDhtTxLocal(
                             ctx.shared(),
+                            req.topologyVersion(),
                             nearNode.id(),
                             req.version(),
                             req.futureId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 8c295ce..b9afbed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -126,7 +126,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 MiniFuture f = (MiniFuture)fut;
 
                 if (f.node().id().equals(nodeId)) {
-                    f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will retry): " + nodeId));
+                    f.onNodeLeft(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId), true);
 
                     return true;
                 }
@@ -327,7 +327,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
             catch (IgniteCheckedException e) {
                 // Fail the whole thing.
                 if (e instanceof ClusterTopologyCheckedException)
-                    fut.onResult((ClusterTopologyCheckedException)e);
+                    fut.onNodeLeft((ClusterTopologyCheckedException)e);
                 else
                     fut.onResult(e);
             }
@@ -413,7 +413,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
             catch (IgniteCheckedException e) {
                 // Fail the whole thing.
                 if (e instanceof ClusterTopologyCheckedException)
-                    fut.onResult((ClusterTopologyCheckedException)e);
+                    fut.onNodeLeft((ClusterTopologyCheckedException)e);
                 else
                     fut.onResult(e);
             }
@@ -467,7 +467,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 catch (IgniteCheckedException e) {
                     // Fail the whole thing.
                     if (e instanceof ClusterTopologyCheckedException)
-                        fut.onResult((ClusterTopologyCheckedException)e);
+                        fut.onNodeLeft((ClusterTopologyCheckedException)e);
                     else
                         fut.onResult(e);
                 }
@@ -563,7 +563,15 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
         /**
          * @param e Node failure.
          */
-        void onResult(ClusterTopologyCheckedException e) {
+        void onNodeLeft(ClusterTopologyCheckedException e) {
+            onNodeLeft(e, false);
+        }
+
+        /**
+         * @param e Node failure.
+         * @param discoThread {@code True} if executed from discovery thread.
+         */
+        void onNodeLeft(ClusterTopologyCheckedException e, boolean discoThread) {
             if (log.isDebugEnabled())
                 log.debug("Remote node left grid while sending or waiting for reply (will ignore): " + this);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index ebf1002..acd5017 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -118,6 +118,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
      */
     public GridDhtTxLocal(
         GridCacheSharedContext cctx,
+        AffinityTopologyVersion topVer,
         UUID nearNodeId,
         GridCacheVersion nearXidVer,
         IgniteUuid nearFutId,
@@ -157,7 +158,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             subjId,
             taskNameHash);
 
-        assert cctx != null;
         assert nearNodeId != null;
         assert nearFutId != null;
         assert nearMiniId != null;
@@ -174,6 +174,10 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
         assert !F.eq(xidVer, nearXidVer);
 
         initResult();
+
+        assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+
+        topologyVersion(topVer);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 534a560..34ba87b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -283,7 +283,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                     continue;
 
                 if (e.cached().obsolete()) {
-                    GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key());
+                    GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key(), topologyVersion());
 
                     e.cached(cached);
                 }
@@ -312,7 +312,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                         break;
                     }
                     catch (GridCacheEntryRemovedException ignore) {
-                        GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key());
+                        GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key(), topologyVersion());
 
                         e.cached(cached);
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 445c70a..0541c8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -549,7 +549,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
             GridDistributedCacheEntry entry = (GridDistributedCacheEntry)txEntry.cached();
 
             if (entry == null) {
-                entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key());
+                entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key(), tx.topologyVersion());
 
                 txEntry.cached(entry);
             }
@@ -576,7 +576,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                     if (log.isDebugEnabled())
                         log.debug("Got removed entry in future onAllReplies method (will retry): " + txEntry);
 
-                    entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key());
+                    entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key(), tx.topologyVersion());
 
                     txEntry.cached(entry);
                 }
@@ -817,7 +817,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                     }
                     catch (GridCacheEntryRemovedException ignored) {
                         // Retry.
-                        txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()));
+                        txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), tx.topologyVersion()));
                     }
                 }
             }
@@ -847,7 +847,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 }
                 catch (GridCacheEntryRemovedException ignored) {
                     // Retry.
-                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()));
+                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), tx.topologyVersion()));
                 }
             }
         }
@@ -1317,7 +1317,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 break;
             }
             catch (GridCacheEntryRemovedException ignore) {
-                cached = dht.entryExx(entry.key());
+                cached = dht.entryExx(entry.key(), tx.topologyVersion());
 
                 entry.cached(cached);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 343515d..f509e27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -140,6 +140,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
             Collections.<IgniteTxKey, IgniteTxEntry>emptyMap(),
             new ConcurrentLinkedHashMap<IgniteTxKey, IgniteTxEntry>(U.capacity(txSize), 0.75f, 1));
 
+        assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+
         topologyVersion(topVer);
     }
 
@@ -207,6 +209,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
             Collections.<IgniteTxKey, IgniteTxEntry>emptyMap(),
             new ConcurrentLinkedHashMap<IgniteTxKey, IgniteTxEntry>(U.capacity(txSize), 0.75f, 1));
 
+        assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+
         topologyVersion(topVer);
     }
 


Mime
View raw message