ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [24/31] incubator-ignite git commit: ignite-471-2: huge merge from sprint-6
Date Wed, 10 Jun 2015 16:27:47 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 78966d0..1d57ef7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -80,7 +80,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
     private IgniteUuid futId = IgniteUuid.randomUuid();
 
     /** Preloader. */
-    private GridDhtPreloader<K, V> preloader;
+    private GridDhtPreloader preloader;
 
     /** Trackable flag. */
     private boolean trackable;
@@ -95,7 +95,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
         GridCacheContext<K, V> cctx,
         AffinityTopologyVersion topVer,
         Collection<KeyCacheObject> keys,
-        GridDhtPreloader<K, V> preloader
+        GridDhtPreloader preloader
     ) {
         assert topVer.topologyVersion() != 0 : topVer;
         assert !F.isEmpty(keys) : keys;
@@ -208,21 +208,21 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @return {@code True} if some mapping was added.
      */
     private boolean map(Iterable<KeyCacheObject> keys, Collection<ClusterNode> exc) {
-        Map<ClusterNode, Set<KeyCacheObject>> mappings = new HashMap<>();
-
-        ClusterNode loc = cctx.localNode();
-
-        int curTopVer = topCntr.get();
+        Map<ClusterNode, Set<KeyCacheObject>> mappings = null;
 
         for (KeyCacheObject key : keys)
-            map(key, mappings, exc);
+            mappings = map(key, mappings, exc);
 
         if (isDone())
             return false;
 
         boolean ret = false;
 
-        if (!mappings.isEmpty()) {
+        if (mappings != null) {
+            ClusterNode loc = cctx.localNode();
+
+            int curTopVer = topCntr.get();
+
             preloader.addFuture(this);
 
             trackable = true;
@@ -275,22 +275,27 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @param key Key.
      * @param exc Exclude nodes.
      * @param mappings Mappings.
+     * @return Mappings.
      */
-    private void map(KeyCacheObject key, Map<ClusterNode, Set<KeyCacheObject>> mappings, Collection<ClusterNode> exc) {
+    private Map<ClusterNode, Set<KeyCacheObject>> map(KeyCacheObject key,
+        @Nullable Map<ClusterNode, Set<KeyCacheObject>> mappings,
+        Collection<ClusterNode> exc)
+    {
         ClusterNode loc = cctx.localNode();
 
-        int part = cctx.affinity().partition(key);
-
         GridCacheEntryEx e = cctx.dht().peekEx(key);
 
         try {
             if (e != null && !e.isNewLocked()) {
-                if (log.isDebugEnabled())
+                if (log.isDebugEnabled()) {
+                    int part = cctx.affinity().partition(key);
+
                     log.debug("Will not rebalance key (entry is not new) [cacheName=" + cctx.name() +
                         ", key=" + key + ", part=" + part + ", locId=" + cctx.nodeId() + ']');
+                }
 
                 // Key has been rebalanced or retrieved already.
-                return;
+                return mappings;
             }
         }
         catch (GridCacheEntryRemovedException ignore) {
@@ -299,6 +304,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                     ", locId=" + cctx.nodeId() + ']');
         }
 
+        int part = cctx.affinity().partition(key);
+
         List<ClusterNode> owners = F.isEmpty(exc) ? top.owners(part, topVer) :
             new ArrayList<>(F.view(top.owners(part, topVer), F.notIn(exc)));
 
@@ -308,7 +315,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                     "topVer=" + topVer + ", locId=" + cctx.nodeId() + ']');
 
             // Key is already rebalanced.
-            return;
+            return mappings;
         }
 
         // Create partition.
@@ -337,9 +344,12 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                     log.debug("Will not rebalance key (no nodes to request from with rebalancing disabled) [key=" +
                         key + ", part=" + part + ", locId=" + cctx.nodeId() + ']');
 
-                return;
+                return mappings;
             }
 
+            if (mappings == null)
+                mappings = U.newHashMap(keys.size());
+
             Collection<KeyCacheObject> mappedKeys = F.addIfAbsent(mappings, pick, F.<KeyCacheObject>newSet());
 
             assert mappedKeys != null;
@@ -357,6 +367,8 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                 log.debug("Will not rebalance key (local partition is not MOVING) [cacheName=" + cctx.name() +
                     ", key=" + key + ", part=" + locPart + ", locId=" + cctx.nodeId() + ']');
         }
+
+        return mappings;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 633f237..a6e6c4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -53,12 +53,12 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
  * and populating local cache.
  */
 @SuppressWarnings("NonConstantFieldWithUpperCaseName")
-public class GridDhtPartitionDemandPool<K, V> {
+public class GridDhtPartitionDemandPool {
     /** Dummy message to wake up a blocking queue if a node leaves. */
     private final SupplyMessage DUMMY_TOP = new SupplyMessage();
 
     /** */
-    private final GridCacheContext<K, V> cctx;
+    private final GridCacheContext<?, ?> cctx;
 
     /** */
     private final IgniteLogger log;
@@ -99,7 +99,7 @@ public class GridDhtPartitionDemandPool<K, V> {
      * @param cctx Cache context.
      * @param busyLock Shutdown lock.
      */
-    public GridDhtPartitionDemandPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) {
+    public GridDhtPartitionDemandPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
         assert cctx != null;
         assert busyLock != null;
 
@@ -108,9 +108,11 @@ public class GridDhtPartitionDemandPool<K, V> {
 
         log = cctx.logger(getClass());
 
-        poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
+        boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
 
-        if (poolSize > 0) {
+        poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0;
+
+        if (enabled) {
             barrier = new CyclicBarrier(poolSize);
 
             dmdWorkers = new ArrayList<>(poolSize);
@@ -327,7 +329,7 @@ public class GridDhtPartitionDemandPool<K, V> {
      * @param assigns Assignments.
      * @param force {@code True} if dummy reassign.
      */
-    void addAssignments(final GridDhtPreloaderAssignments<K, V> assigns, boolean force) {
+    void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) {
         if (log.isDebugEnabled())
             log.debug("Adding partition assignments: " + assigns);
 
@@ -399,7 +401,7 @@ public class GridDhtPartitionDemandPool<K, V> {
         private int id;
 
         /** Partition-to-node assignments. */
-        private final LinkedBlockingDeque<GridDhtPreloaderAssignments<K, V>> assignQ = new LinkedBlockingDeque<>();
+        private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
 
         /** Message queue. */
         private final LinkedBlockingDeque<SupplyMessage> msgQ =
@@ -425,7 +427,7 @@ public class GridDhtPartitionDemandPool<K, V> {
         /**
          * @param assigns Assignments.
          */
-        void addAssignments(GridDhtPreloaderAssignments<K, V> assigns) {
+        void addAssignments(GridDhtPreloaderAssignments assigns) {
             assert assigns != null;
 
             assignQ.offer(assigns);
@@ -885,7 +887,7 @@ public class GridDhtPartitionDemandPool<K, V> {
                     }
 
                     // Sync up all demand threads at this step.
-                    GridDhtPreloaderAssignments<K, V> assigns = null;
+                    GridDhtPreloaderAssignments assigns = null;
 
                     while (assigns == null)
                         assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this);
@@ -995,12 +997,12 @@ public class GridDhtPartitionDemandPool<K, V> {
      * @param exchFut Exchange future.
      * @return Assignments of partitions to nodes.
      */
-    GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) {
+    GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
         // No assignments for disabled preloader.
         GridDhtPartitionTopology top = cctx.dht().topology();
 
         if (!cctx.rebalanceEnabled())
-            return new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion());
+            return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
         int partCnt = cctx.affinity().partitions();
 
@@ -1009,7 +1011,7 @@ public class GridDhtPartitionDemandPool<K, V> {
             "Topology version mismatch [exchId=" + exchFut.exchangeId() +
                 ", topVer=" + top.topologyVersion() + ']';
 
-        GridDhtPreloaderAssignments<K, V> assigns = new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion());
+        GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
         AffinityTopologyVersion topVer = assigns.topologyVersion();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
index facf7e3..faa6cf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
@@ -237,7 +237,7 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext
      * @return Full string representation.
      */
     public String toFullString() {
-        return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", super.toString());
+        return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", map.toString());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
index 5d9677d..13cfef3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
@@ -43,9 +43,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 /**
  * Thread pool for supplying partitions to demanding nodes.
  */
-class GridDhtPartitionSupplyPool<K, V> {
+class GridDhtPartitionSupplyPool {
     /** */
-    private final GridCacheContext<K, V> cctx;
+    private final GridCacheContext<?, ?> cctx;
 
     /** */
     private final IgniteLogger log;
@@ -72,7 +72,7 @@ class GridDhtPartitionSupplyPool<K, V> {
      * @param cctx Cache context.
      * @param busyLock Shutdown lock.
      */
-    GridDhtPartitionSupplyPool(GridCacheContext<K, V> cctx, ReadWriteLock busyLock) {
+    GridDhtPartitionSupplyPool(GridCacheContext<?, ?> cctx, ReadWriteLock busyLock) {
         assert cctx != null;
         assert busyLock != null;
 
@@ -83,16 +83,18 @@ class GridDhtPartitionSupplyPool<K, V> {
 
         top = cctx.dht().topology();
 
-        int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
+        if (!cctx.kernalContext().clientNode()) {
+            int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0;
 
-        for (int i = 0; i < poolSize; i++)
-            workers.add(new SupplyWorker());
+            for (int i = 0; i < poolSize; i++)
+                workers.add(new SupplyWorker());
 
-        cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
-            @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
-                processDemandMessage(id, m);
-            }
-        });
+            cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2<UUID, GridDhtPartitionDemandMessage>() {
+                @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) {
+                    processDemandMessage(id, m);
+                }
+            });
+        }
 
         depEnabled = cctx.gridDeploy().enabled();
     }
@@ -248,11 +250,6 @@ class GridDhtPartitionSupplyPool<K, V> {
             boolean ack = false;
 
             try {
-                // Partition map exchange is finished which means that all near transactions with given
-                // topology version are committed. We can wait for local locks here as it will not take
-                // much time.
-                cctx.mvcc().finishLocks(d.topologyVersion()).get();
-
                 for (int part : d.partitions()) {
                     GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4b8db00..9f18c98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -44,6 +44,8 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 import java.util.concurrent.locks.*;
 
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 
 /**
@@ -117,8 +119,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     private GridFutureAdapter<Boolean> initFut;
 
     /** Topology snapshot. */
-    private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot =
-        new AtomicReference<>();
+    private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>();
 
     /** Last committed cache version before next topology version use. */
     private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
@@ -146,8 +147,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /** Dynamic cache change requests. */
     private Collection<DynamicCacheChangeRequest> reqs;
 
+    /** Cache validation results. */
     private volatile Map<Integer, Boolean> cacheValidRes;
 
+    /** Skip preload flag. */
+    private boolean skipPreload;
+
     /**
      * Dummy future created to trigger reassignments if partition
      * topology changed while preloading.
@@ -200,6 +205,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @param cctx Cache context.
      * @param busyLock Busy lock.
      * @param exchId Exchange ID.
+     * @param reqs Cache change requests.
      */
     public GridDhtPartitionsExchangeFuture(
         GridCacheSharedContext cctx,
@@ -221,16 +227,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         log = cctx.logger(getClass());
 
-        // Grab all nodes with order of equal or less than last joined node.
-        oldestNode.set(CU.oldest(cctx, exchId.topologyVersion()));
-
-        assert oldestNode.get() != null;
-
         initFut = new GridFutureAdapter<>();
 
         if (log.isDebugEnabled())
-            log.debug("Creating exchange future [localNode=" + cctx.localNodeId() +
-                ", fut=" + this + ']');
+            log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']');
+    }
+
+    /**
+     * @param reqs Cache change requests.
+     */
+    public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) {
+        this.reqs = reqs;
     }
 
     /** {@inheritDoc} */
@@ -250,6 +257,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
+     * @return Skip preload flag.
+     */
+    public boolean skipPreload() {
+        return skipPreload;
+    }
+
+    /**
      * @return Dummy flag.
      */
     public boolean dummy() {
@@ -279,9 +293,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
     /**
      * @param cacheId Cache ID to check.
+     * @param topVer Topology version.
      * @return {@code True} if cache was added during this exchange.
      */
-    public boolean isCacheAdded(int cacheId) {
+    public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
         if (!F.isEmpty(reqs)) {
             for (DynamicCacheChangeRequest req : reqs) {
                 if (req.start() && !req.clientStartOnly()) {
@@ -291,7 +306,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             }
         }
 
-        return false;
+        GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+        return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
     }
 
     /**
@@ -312,7 +329,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * Rechecks topology.
+     * @param cacheCtx Cache context.
+     * @throws IgniteCheckedException If failed.
      */
     private void initTopology(GridCacheContext cacheCtx) throws IgniteCheckedException {
         if (stopping(cacheCtx.cacheId()))
@@ -330,8 +348,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     exchId + ']');
 
             // Fetch affinity assignment from remote node.
-            GridDhtAssignmentFetchFuture fetchFut =
-                new GridDhtAssignmentFetchFuture(cacheCtx, exchId.topologyVersion(), CU.affinityNodes(cacheCtx));
+            GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cacheCtx,
+                exchId.topologyVersion(),
+                CU.affinityNodes(cacheCtx, exchId.topologyVersion()));
 
             fetchFut.init();
 
@@ -341,11 +360,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 log.debug("Fetched affinity from remote node, initializing affinity assignment [locNodeId=" +
                     cctx.localNodeId() + ", topVer=" + exchId.topologyVersion() + ']');
 
+            if (affAssignment == null) {
+                affAssignment = new ArrayList<>(cacheCtx.affinity().partitions());
+
+                List<ClusterNode> empty = Collections.emptyList();
+
+                for (int i = 0; i < cacheCtx.affinity().partitions(); i++)
+                    affAssignment.add(empty);
+            }
+
             cacheCtx.affinity().initializeAffinity(exchId.topologyVersion(), affAssignment);
         }
     }
 
     /**
+     * @param cacheCtx Cache context.
      * @return {@code True} if local node can calculate affinity on it's own for this partition map exchange.
      */
     private boolean canCalculateAffinity(GridCacheContext cacheCtx) {
@@ -391,20 +420,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * @return Exchange id.
-     */
-    GridDhtPartitionExchangeId key() {
-        return exchId;
-    }
-
-    /**
-     * @return Oldest node.
-     */
-    ClusterNode oldestNode() {
-        return oldestNode.get();
-    }
-
-    /**
      * @return Exchange ID.
      */
     public GridDhtPartitionExchangeId exchangeId() {
@@ -412,13 +427,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * @return Init future.
-     */
-    IgniteInternalFuture<?> initFuture() {
-        return initFut;
-    }
-
-    /**
      * @return {@code true} if entered to busy state.
      */
     private boolean enterBusy() {
@@ -444,7 +452,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
     public void init() throws IgniteInterruptedCheckedException {
-        assert oldestNode.get() != null;
+        if (isDone())
+            return;
 
         if (init.compareAndSet(false, true)) {
             if (isDone())
@@ -455,10 +464,118 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 // will return corresponding nodes.
                 U.await(evtLatch);
 
+                assert discoEvt != null : this;
+                assert !dummy && !forcePreload : this;
+
+                ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
+
+                oldestNode.set(oldest);
+
                 startCaches();
 
+                // True if client node joined or failed.
+                boolean clientNodeEvt;
+
+                if (F.isEmpty(reqs)) {
+                    int type = discoEvt.type();
+
+                    assert type == EVT_NODE_JOINED || type == EVT_NODE_LEFT || type == EVT_NODE_FAILED : discoEvt;
+
+                    clientNodeEvt = CU.clientNode(discoEvt.eventNode());
+                }
+                else {
+                    assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt;
+
+                    boolean clientOnlyStart = true;
+
+                    for (DynamicCacheChangeRequest req : reqs) {
+                        if (!req.clientStartOnly()) {
+                            clientOnlyStart = false;
+
+                            break;
+                        }
+                    }
+
+                    clientNodeEvt = clientOnlyStart;
+                }
+
+                if (clientNodeEvt) {
+                    ClusterNode node = discoEvt.eventNode();
+
+                    // Client need to initialize affinity for local join event or for stated client caches.
+                    if (!node.isLocal()) {
+                        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                            if (cacheCtx.isLocal())
+                                continue;
+
+                            GridDhtPartitionTopology top = cacheCtx.topology();
+
+                            top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
+
+                            if (cacheCtx.affinity().affinityTopologyVersion() == AffinityTopologyVersion.NONE) {
+                                initTopology(cacheCtx);
+
+                                top.beforeExchange(this);
+                            }
+                            else
+                                cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion());
+                        }
+
+                        if (exchId.isLeft())
+                            cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
+
+                        onDone(exchId.topologyVersion());
+
+                        skipPreload = cctx.kernalContext().clientNode();
+
+                        return;
+                    }
+                }
+
+                if (cctx.kernalContext().clientNode()) {
+                    skipPreload = true;
+
+                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                        if (cacheCtx.isLocal())
+                            continue;
+
+                        GridDhtPartitionTopology top = cacheCtx.topology();
+
+                        top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
+                    }
+
+                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                        if (cacheCtx.isLocal())
+                            continue;
+
+                        initTopology(cacheCtx);
+                    }
+
+                    if (oldestNode.get() != null) {
+                        rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx,
+                            exchId.topologyVersion()));
+
+                        rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
+
+                        ready.set(true);
+
+                        initFut.onDone(true);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Initialized future: " + this);
+
+                        sendPartitions();
+                    }
+                    else
+                        onDone(exchId.topologyVersion());
+
+                    return;
+                }
+
+                assert oldestNode.get() != null;
+
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    if (isCacheAdded(cacheCtx.cacheId())) {
+                    if (isCacheAdded(cacheCtx.cacheId(), exchId.topologyVersion())) {
                         if (cacheCtx.discovery().cacheAffinityNodes(cacheCtx.name(), topologyVersion()).isEmpty())
                             U.quietAndWarn(log, "No server nodes found for cache client: " + cacheCtx.namex());
                     }
@@ -468,8 +585,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                 List<String> cachesWithoutNodes = null;
 
-                for (String name : cctx.cache().cacheNames()) {
-                    if (exchId.isLeft()) {
+                if (exchId.isLeft()) {
+                    for (String name : cctx.cache().cacheNames()) {
                         if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) {
                             if (cachesWithoutNodes == null)
                                 cachesWithoutNodes = new ArrayList<>();
@@ -505,7 +622,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 }
 
                 if (cachesWithoutNodes != null) {
-                    StringBuilder sb = new StringBuilder("All server nodes for the following caches have left the cluster: ");
+                    StringBuilder sb =
+                        new StringBuilder("All server nodes for the following caches have left the cluster: ");
 
                     for (int i = 0; i < cachesWithoutNodes.size(); i++) {
                         String cache = cachesWithoutNodes.get(i);
@@ -537,7 +655,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 }
 
                 // Grab all alive remote nodes with order of equal or less than last joined node.
-                rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx,
+                rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx,
                     exchId.topologyVersion()));
 
                 rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
@@ -591,6 +709,28 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 if (exchId.isLeft())
                     cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
 
+                IgniteInternalFuture<?> locksFut = cctx.mvcc().finishLocks(exchId.topologyVersion());
+
+                while (true) {
+                    try {
+                        locksFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
+
+                        break;
+                    }
+                    catch (IgniteFutureTimeoutCheckedException ignored) {
+                        U.warn(log, "Failed to wait for locks release future. " +
+                            "Dumping pending objects that might be the cause: " + cctx.localNodeId());
+
+                        U.warn(log, "Locked entries:");
+
+                        Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks =
+                            cctx.mvcc().unfinishedLocks(exchId.topologyVersion());
+
+                        for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet())
+                            U.warn(log, "Locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']');
+                    }
+                }
+
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                     if (cacheCtx.isLocal())
                         continue;
@@ -650,36 +790,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             if (log.isDebugEnabled())
                 log.debug("Initialized future: " + this);
 
-            if (canSkipExchange())
-                onDone(exchId.topologyVersion());
+            // If this node is not oldest.
+            if (!oldestNode.get().id().equals(cctx.localNodeId()))
+                sendPartitions();
             else {
-                // If this node is not oldest.
-                if (!oldestNode.get().id().equals(cctx.localNodeId()))
-                    sendPartitions();
-                else {
-                    boolean allReceived = allReceived();
+                boolean allReceived = allReceived();
 
-                    if (allReceived && replied.compareAndSet(false, true)) {
-                        if (spreadPartitions())
-                            onDone(exchId.topologyVersion());
-                    }
+                if (allReceived && replied.compareAndSet(false, true)) {
+                    if (spreadPartitions())
+                        onDone(exchId.topologyVersion());
                 }
-
-                scheduleRecheck();
             }
+
+            scheduleRecheck();
         }
         else
             assert false : "Skipped init future: " + this;
     }
 
     /**
-     * @return {@code True} if no distributed exchange is needed.
-     */
-    private boolean canSkipExchange() {
-        return false; // TODO ignite-23;
-    }
-
-    /**
      *
      */
     private void dumpPendingObjects() {
@@ -755,7 +884,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException {
-        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last());
+        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
+            cctx.kernalContext().clientNode(),
+            cctx.versions().last());
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal())
@@ -780,8 +911,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             id.topologyVersion());
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal())
-                m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+            if (!cacheCtx.isLocal()) {
+                AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
+
+                boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0;
+
+                if (ready)
+                    m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+            }
         }
 
         // It is important that client topologies be added after contexts.
@@ -839,14 +976,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
     /** {@inheritDoc} */
     @Override public boolean onDone(AffinityTopologyVersion res, Throwable err) {
-        Map<Integer, Boolean> m = new HashMap<>();
+        Map<Integer, Boolean> m = null;
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name()))
+            if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name())) {
+                if (m == null)
+                    m = new HashMap<>();
+
                 m.put(cacheCtx.cacheId(), cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes()));
+            }
         }
 
-        cacheValidRes = m;
+        cacheValidRes = m != null ? m : Collections.<Integer, Boolean>emptyMap();
 
         cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err);
 
@@ -864,8 +1005,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             if (timeoutObj != null)
                 cctx.kernalContext().timeout().removeTimeoutObject(timeoutObj);
 
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (exchId.event() == EventType.EVT_NODE_FAILED || exchId.event() == EventType.EVT_NODE_LEFT)
+            if (exchId.isLeft()) {
+                for (GridCacheContext cacheCtx : cctx.cacheContexts())
                     cacheCtx.config().getAffinity().removeNode(exchId.nodeId());
             }
 
@@ -1018,39 +1159,39 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             return;
         }
 
-        ClusterNode curOldest = oldestNode.get();
+        if (log.isDebugEnabled())
+            log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']');
 
-        if (!nodeId.equals(curOldest.id())) {
-            if (log.isDebugEnabled())
-                log.debug("Received full partition map from unexpected node [oldest=" + curOldest.id() +
-                    ", unexpectedNodeId=" + nodeId + ']');
+        assert exchId.topologyVersion().equals(msg.topologyVersion());
 
-            ClusterNode sender = cctx.discovery().node(nodeId);
+        initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+            @Override public void apply(IgniteInternalFuture<Boolean> t) {
+                ClusterNode curOldest = oldestNode.get();
 
-            if (sender == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Sender node left grid, will ignore message from unexpected node [nodeId=" + nodeId +
-                        ", exchId=" + msg.exchangeId() + ']');
+                if (!nodeId.equals(curOldest.id())) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received full partition map from unexpected node [oldest=" + curOldest.id() +
+                            ", unexpectedNodeId=" + nodeId + ']');
 
-                return;
-            }
+                    ClusterNode snd = cctx.discovery().node(nodeId);
 
-            // Will process message later if sender node becomes oldest node.
-            if (sender.order() > curOldest.order())
-                fullMsgs.put(nodeId, msg);
+                    if (snd == null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Sender node left grid, will ignore message from unexpected node [nodeId=" + nodeId +
+                                ", exchId=" + msg.exchangeId() + ']');
 
-            return;
-        }
+                        return;
+                    }
 
-        assert msg.exchangeId().equals(exchId);
+                    // Will process message later if sender node becomes oldest node.
+                    if (snd.order() > curOldest.order())
+                        fullMsgs.put(nodeId, msg);
 
-        if (log.isDebugEnabled())
-            log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']');
+                    return;
+                }
 
-        assert exchId.topologyVersion().equals(msg.topologyVersion());
+                assert msg.exchangeId().equals(exchId);
 
-        initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-            @Override public void apply(IgniteInternalFuture<Boolean> t) {
                 assert msg.lastVersion() != null;
 
                 cctx.versions().onReceived(nodeId, msg.lastVersion());
@@ -1075,8 +1216,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             if (cacheCtx != null)
                 cacheCtx.topology().update(exchId, entry.getValue());
-            else if (CU.oldest(cctx).isLocal())
-                cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue());
+            else {
+                ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+
+                if (oldest != null && oldest.isLocal())
+                    cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue());
+            }
         }
     }
 
@@ -1135,40 +1280,42 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                             boolean set = false;
 
-                            ClusterNode newOldest = CU.oldest(cctx, exchId.topologyVersion());
-
-                            // If local node is now oldest.
-                            if (newOldest.id().equals(cctx.localNodeId())) {
-                                synchronized (mux) {
-                                    if (oldestNode.compareAndSet(oldest, newOldest)) {
-                                        // If local node is just joining.
-                                        if (exchId.nodeId().equals(cctx.localNodeId())) {
-                                            try {
-                                                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                                                    if (!cacheCtx.isLocal())
-                                                        cacheCtx.topology().beforeExchange(
-                                                            GridDhtPartitionsExchangeFuture.this);
+                            ClusterNode newOldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
+
+                            if (newOldest != null) {
+                                // If local node is now oldest.
+                                if (newOldest.id().equals(cctx.localNodeId())) {
+                                    synchronized (mux) {
+                                        if (oldestNode.compareAndSet(oldest, newOldest)) {
+                                            // If local node is just joining.
+                                            if (exchId.nodeId().equals(cctx.localNodeId())) {
+                                                try {
+                                                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                                                        if (!cacheCtx.isLocal())
+                                                            cacheCtx.topology().beforeExchange(
+                                                                GridDhtPartitionsExchangeFuture.this);
+                                                    }
                                                 }
-                                            }
-                                            catch (IgniteCheckedException e) {
-                                                onDone(e);
+                                                catch (IgniteCheckedException e) {
+                                                    onDone(e);
 
-                                                return;
+                                                    return;
+                                                }
                                             }
-                                        }
 
-                                        set = true;
+                                            set = true;
+                                        }
                                     }
                                 }
-                            }
-                            else {
-                                synchronized (mux) {
-                                    set = oldestNode.compareAndSet(oldest, newOldest);
-                                }
+                                else {
+                                    synchronized (mux) {
+                                        set = oldestNode.compareAndSet(oldest, newOldest);
+                                    }
 
-                                if (set && log.isDebugEnabled())
-                                    log.debug("Reassigned oldest node [this=" + cctx.localNodeId() +
-                                        ", old=" + oldest.id() + ", new=" + newOldest.id() + ']');
+                                    if (set && log.isDebugEnabled())
+                                        log.debug("Reassigned oldest node [this=" + cctx.localNodeId() +
+                                            ", old=" + oldest.id() + ", new=" + newOldest.id() + ']');
+                                }
                             }
 
                             if (set) {
@@ -1190,9 +1337,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                             assert rmtNodes != null;
 
-                            for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); )
+                            for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) {
                                 if (it.next().id().equals(nodeId))
                                     it.remove();
+                            }
 
                             if (allReceived() && ready.get() && replied.compareAndSet(false, true))
                                 if (spreadPartitions())
@@ -1254,30 +1402,34 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             GridTimeoutObject timeoutObj = new GridTimeoutObjectAdapter(
                 cctx.gridConfig().getNetworkTimeout() * Math.max(1, cctx.gridConfig().getCacheConfiguration().length)) {
                 @Override public void onTimeout() {
-                    if (isDone())
-                        return;
-
-                    if (!enterBusy())
-                        return;
+                    cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                        @Override public void run() {
+                            if (isDone())
+                                return;
+
+                            if (!enterBusy())
+                                return;
+
+                            try {
+                                U.warn(log,
+                                    "Retrying preload partition exchange due to timeout [done=" + isDone() +
+                                        ", dummy=" + dummy + ", exchId=" + exchId + ", rcvdIds=" + F.id8s(rcvdIds) +
+                                        ", rmtIds=" + F.id8s(rmtIds) + ", remaining=" + F.id8s(remaining()) +
+                                        ", init=" + init + ", initFut=" + initFut.isDone() +
+                                        ", ready=" + ready + ", replied=" + replied + ", added=" + added +
+                                        ", oldest=" + U.id8(oldestNode.get().id()) + ", oldestOrder=" +
+                                        oldestNode.get().order() + ", evtLatch=" + evtLatch.getCount() +
+                                        ", locNodeOrder=" + cctx.localNode().order() +
+                                        ", locNodeId=" + cctx.localNode().id() + ']',
+                                    "Retrying preload partition exchange due to timeout.");
 
-                    try {
-                        U.warn(log,
-                            "Retrying preload partition exchange due to timeout [done=" + isDone() +
-                                ", dummy=" + dummy + ", exchId=" + exchId + ", rcvdIds=" + F.id8s(rcvdIds) +
-                                ", rmtIds=" + F.id8s(rmtIds) + ", remaining=" + F.id8s(remaining()) +
-                                ", init=" + init + ", initFut=" + initFut.isDone() +
-                                ", ready=" + ready + ", replied=" + replied + ", added=" + added +
-                                ", oldest=" + U.id8(oldestNode.get().id()) + ", oldestOrder=" +
-                                oldestNode.get().order() + ", evtLatch=" + evtLatch.getCount() +
-                                ", locNodeOrder=" + cctx.localNode().order() +
-                                ", locNodeId=" + cctx.localNode().id() + ']',
-                            "Retrying preload partition exchange due to timeout.");
-
-                        recheck();
-                    }
-                    finally {
-                        leaveBusy();
-                    }
+                                recheck();
+                            }
+                            finally {
+                                leaveBusy();
+                            }
+                        }
+                    });
                 }
             };
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 8256274..73794ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -59,8 +59,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /**
      * @param id Exchange ID.
      * @param lastVer Last version.
+     * @param topVer Topology version.
      */
-    public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer,
+    public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id,
+        @Nullable GridCacheVersion lastVer,
         @NotNull AffinityTopologyVersion topVer) {
         super(id, lastVer);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 66140cd..713a80b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -45,6 +45,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /** Serialized partitions. */
     private byte[] partsBytes;
 
+    /** */
+    private boolean client;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -54,10 +57,22 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /**
      * @param exchId Exchange ID.
+     * @param client Client message flag.
      * @param lastVer Last version.
      */
-    public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer) {
+    public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
+        boolean client,
+        @Nullable GridCacheVersion lastVer) {
         super(exchId, lastVer);
+
+        this.client = client;
+    }
+
+    /**
+     * @return {@code True} if sent from client node.
+     */
+    public boolean client() {
+        return client;
     }
 
     /**
@@ -110,6 +125,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
         switch (writer.state()) {
             case 5:
+                if (!writer.writeBoolean("client", client))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -132,6 +153,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
         switch (reader.state()) {
             case 5:
+                client = reader.readBoolean("client");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -151,7 +180,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 7;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index d6373f0..51010ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
@@ -46,7 +47,7 @@ import static org.apache.ignite.internal.util.GridConcurrentFactory.*;
 /**
  * DHT cache preloader.
  */
-public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
+public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** Default preload resend timeout. */
     public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500;
 
@@ -57,13 +58,13 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
     private final GridAtomicLong topVer = new GridAtomicLong();
 
     /** Force key futures. */
-    private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<K, V>> forceKeyFuts = newMap();
+    private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap();
 
     /** Partition suppliers. */
-    private GridDhtPartitionSupplyPool<K, V> supplyPool;
+    private GridDhtPartitionSupplyPool supplyPool;
 
     /** Partition demanders. */
-    private GridDhtPartitionDemandPool<K, V> demandPool;
+    private GridDhtPartitionDemandPool demandPool;
 
     /** Start future. */
     private final GridFutureAdapter<Object> startFut;
@@ -92,7 +93,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
 
                 assert !loc.id().equals(n.id());
 
-                for (GridDhtForceKeysFuture<K, V> f : forceKeyFuts.values())
+                for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values())
                     f.onDiscoveryEvent(e);
 
                 assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " +
@@ -117,7 +118,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
     /**
      * @param cctx Cache context.
      */
-    public GridDhtPreloader(GridCacheContext<K, V> cctx) {
+    public GridDhtPreloader(GridCacheContext<?, ?> cctx) {
         super(cctx);
 
         top = cctx.dht().topology();
@@ -158,8 +159,8 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
                 }
             });
 
-        supplyPool = new GridDhtPartitionSupplyPool<>(cctx, busyLock);
-        demandPool = new GridDhtPartitionDemandPool<>(cctx, busyLock);
+        supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock);
+        demandPool = new GridDhtPartitionDemandPool(cctx, busyLock);
 
         cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
@@ -227,12 +228,14 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
 
             final long start = U.currentTimeMillis();
 
-            if (cctx.config().getRebalanceDelay() >= 0) {
-                U.log(log, "Starting rebalancing in " + cctx.config().getRebalanceMode() + " mode: " + cctx.name());
+            final CacheConfiguration cfg = cctx.config();
+
+            if (cfg.getRebalanceDelay() >= 0) {
+                U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
 
                 demandPool.syncFuture().listen(new CI1<Object>() {
                     @Override public void apply(Object t) {
-                        U.log(log, "Completed rebalancing in " + cctx.config().getRebalanceMode() + " mode " +
+                        U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " +
                             "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
                     }
                 });
@@ -253,12 +256,12 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) {
+    @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
         return demandPool.assign(exchFut);
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload) {
+    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
         demandPool.addAssignments(assignments, forcePreload);
     }
 
@@ -271,7 +274,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> syncFuture() {
-        return demandPool.syncFuture();
+        return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
     }
 
     /**
@@ -406,7 +409,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
             return;
 
         try {
-            GridDhtForceKeysFuture<K, V> f = forceKeyFuts.get(msg.futureId());
+            GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId());
 
             if (f != null)
                 f.onResult(node.id(), msg);
@@ -491,7 +494,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
      */
     @SuppressWarnings( {"unchecked", "RedundantCast"})
     @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
-        final GridDhtForceKeysFuture<K, V> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
+        final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
 
         IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer);
 
@@ -543,7 +546,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
      *
      * @param fut Future to add.
      */
-    void addFuture(GridDhtForceKeysFuture<K, V> fut) {
+    void addFuture(GridDhtForceKeysFuture<?, ?> fut) {
         forceKeyFuts.put(fut.futureId(), fut);
     }
 
@@ -552,7 +555,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
      *
      * @param fut Future to remove.
      */
-    void remoteFuture(GridDhtForceKeysFuture<K, V> fut) {
+    void remoteFuture(GridDhtForceKeysFuture<?, ?> fut) {
         forceKeyFuts.remove(fut.futureId(), fut);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 369fc68..2f6ef6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -27,8 +27,7 @@ import java.util.concurrent.*;
 /**
  * Partition to node assignments.
  */
-public class GridDhtPreloaderAssignments<K, V> extends
-    ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> {
+public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index ba3357d..041f83a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -433,6 +433,11 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException {
+        return dht.tryPutIfAbsent(key, val);
+    }
+
+    /** {@inheritDoc} */
     @Override public V getAndReplace(K key, V val) throws IgniteCheckedException {
         return dht.getAndReplace(key, val);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 8258b14..351d6cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -95,7 +95,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
     }
 
     /** {@inheritDoc} */
-    @Override public GridCachePreloader<K, V> preloader() {
+    @Override public GridCachePreloader preloader() {
         return dht().preloader();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index fc178e3..74438bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -274,7 +274,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         if (affNodes.isEmpty()) {
             assert !cctx.affinityNode();
 
-            onDone(new ClusterTopologyCheckedException("Failed to map keys for near-only cache (all partition " +
+            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for near-only cache (all partition " +
                 "nodes left the grid)."));
 
             return;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 0ffb4e5..3d28018 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -45,7 +45,7 @@ import static org.apache.ignite.events.EventType.*;
 /**
  * Cache lock future.
  */
-public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean>
     implements GridCacheMvccFuture<Boolean> {
     /** */
     private static final long serialVersionUID = 0L;
@@ -58,7 +58,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
 
     /** Cache registry. */
     @GridToStringExclude
-    private GridCacheContext<K, V> cctx;
+    private GridCacheContext<?, ?> cctx;
 
     /** Lock owner thread. */
     @GridToStringInclude
@@ -135,7 +135,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
      * @param skipStore skipStore
      */
     public GridNearLockFuture(
-        GridCacheContext<K, V> cctx,
+        GridCacheContext<?, ?> cctx,
         Collection<KeyCacheObject> keys,
         @Nullable GridNearTxLocal tx,
         boolean read,
@@ -184,15 +184,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
      * @return Participating nodes.
      */
     @Override public Collection<? extends ClusterNode> nodes() {
-        return
-            F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
-                @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
-                    if (isMini(f))
-                        return ((MiniFuture)f).node();
+        return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
+            @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
+                if (isMini(f))
+                    return ((MiniFuture)f).node();
 
-                    return cctx.discovery().localNode();
-                }
-            });
+                return cctx.discovery().localNode();
+            }
+        });
     }
 
     /** {@inheritDoc} */
@@ -350,13 +349,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
      * Undoes all locks.
      *
      * @param dist If {@code true}, then remove locks from remote nodes as well.
+     * @param rollback {@code True} if should rollback tx.
      */
-    private void undoLocks(boolean dist) {
+    private void undoLocks(boolean dist, boolean rollback) {
         // Transactions will undo during rollback.
         if (dist && tx == null)
             cctx.nearTx().removeLocks(lockVer, keys);
         else {
-            if (tx != null) {
+            if (rollback && tx != null) {
                 if (tx.setRollbackOnly()) {
                     if (log.isDebugEnabled())
                         log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx);
@@ -397,7 +397,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
      * @param dist {@code True} if need to distribute lock release.
      */
     private void onFailed(boolean dist) {
-        undoLocks(dist);
+        undoLocks(dist, true);
 
         complete(false);
     }
@@ -607,7 +607,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
                 ", fut=" + this + ']');
 
         if (!success)
-            undoLocks(distribute);
+            undoLocks(distribute, true);
 
         if (tx != null)
             cctx.tm().txContext(tx);
@@ -682,7 +682,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
             // Continue mapping on the same topology version as it was before.
             this.topVer.compareAndSet(null, topVer);
 
-            map(keys);
+            map(keys, false);
 
             markInitialized();
 
@@ -690,14 +690,16 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
         }
 
         // Must get topology snapshot and map on that version.
-        mapOnTopology();
+        mapOnTopology(false);
     }
 
     /**
      * Acquires topology future and checks it completeness under the read lock. If it is not complete,
      * will asynchronously wait for it's completeness and then try again.
+     *
+     * @param remap Remap flag.
      */
-    void mapOnTopology() {
+    void mapOnTopology(final boolean remap) {
         // We must acquire topology snapshot from the topology version future.
         cctx.topology().readLock();
 
@@ -721,19 +723,27 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
 
                 AffinityTopologyVersion topVer = fut.topologyVersion();
 
-                if (tx != null)
-                    tx.topologyVersion(topVer);
+                if (remap) {
+                    if (tx != null)
+                        tx.onRemap(topVer);
 
-                this.topVer.compareAndSet(null, topVer);
+                    this.topVer.set(topVer);
+                }
+                else {
+                    if (tx != null)
+                        tx.topologyVersion(topVer);
+
+                    this.topVer.compareAndSet(null, topVer);
+                }
 
-                map(keys);
+                map(keys, remap);
 
                 markInitialized();
             }
             else {
                 fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                     @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                        mapOnTopology();
+                        mapOnTopology(remap);
                     }
                 });
             }
@@ -749,14 +759,15 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
      * groups belonging to one primary node and locks for these groups are acquired sequentially.
      *
      * @param keys Keys.
+     * @param remap Remap flag.
      */
-    private void map(Iterable<KeyCacheObject> keys) {
+    private void map(Iterable<KeyCacheObject> keys, boolean remap) {
         try {
             AffinityTopologyVersion topVer = this.topVer.get();
 
             assert topVer != null;
 
-            assert topVer.topologyVersion() > 0;
+            assert topVer.topologyVersion() > 0 : topVer;
 
             if (CU.affinityNodes(cctx, topVer).isEmpty()) {
                 onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for near-only cache (all " +
@@ -765,8 +776,11 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
                 return;
             }
 
-            ConcurrentLinkedDeque8<GridNearLockMapping> mappings =
-                new ConcurrentLinkedDeque8<>();
+            boolean clientNode = cctx.kernalContext().clientNode();
+
+            assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
+
+            ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
 
             // Assign keys to primary nodes.
             GridNearLockMapping map = null;
@@ -795,6 +809,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
             if (log.isDebugEnabled())
                 log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
 
+            boolean first = true;
+
             // Create mini futures.
             for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
                 GridNearLockMapping mapping = iter.next();
@@ -872,6 +888,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
 
                                 if (!cand.reentry()) {
                                     if (req == null) {
+                                        boolean clientFirst = false;
+
+                                        if (first) {
+                                            clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks());
+
+                                            first = false;
+                                        }
+
                                         req = new GridNearLockRequest(
                                             cctx.cacheId(),
                                             topVer,
@@ -893,7 +917,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
                                             inTx() ? tx.subjectId() : null,
                                             inTx() ? tx.taskNameHash() : 0,
                                             read ? accessTtl : -1L,
-                                            skipStore);
+                                            skipStore,
+                                            clientFirst);
 
                                         mapping.request(req);
                                     }
@@ -1197,7 +1222,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
     /**
      * @return DHT cache.
      */
-    private GridDhtTransactionalCacheAdapter<K, V> dht() {
+    private GridDhtTransactionalCacheAdapter<?, ?> dht() {
         return cctx.nearTx().dht();
     }
 
@@ -1356,110 +1381,146 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
                     return;
                 }
 
-                int i = 0;
+                if (res.clientRemapVersion() != null) {
+                    assert cctx.kernalContext().clientNode();
 
-                AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
+                    IgniteInternalFuture<?> affFut =
+                        cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
 
-                for (KeyCacheObject k : keys) {
-                    while (true) {
-                        GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
+                    if (affFut != null && !affFut.isDone()) {
+                        affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                            @Override public void apply(IgniteInternalFuture<?> fut) {
+                                remap();
+                            }
+                        });
+                    }
+                    else
+                        remap();
+                }
+                else {
+                    int i = 0;
 
-                        try {
-                            if (res.dhtVersion(i) == null) {
-                                onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
-                                    "(will fail the lock): " + res));
+                    AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
 
-                                return;
-                            }
+                    for (KeyCacheObject k : keys) {
+                        while (true) {
+                            GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
 
-                            IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
+                            try {
+                                if (res.dhtVersion(i) == null) {
+                                    onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+                                        "(will fail the lock): " + res));
 
-                            CacheObject oldVal = entry.rawGet();
-                            boolean hasOldVal = false;
-                            CacheObject newVal = res.value(i);
+                                    return;
+                                }
 
-                            boolean readRecordable = false;
+                                IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
 
-                            if (retval) {
-                                readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+                                CacheObject oldVal = entry.rawGet();
+                                boolean hasOldVal = false;
+                                CacheObject newVal = res.value(i);
 
-                                if (readRecordable)
-                                    hasOldVal = entry.hasValue();
-                            }
+                                boolean readRecordable = false;
 
-                            GridCacheVersion dhtVer = res.dhtVersion(i);
-                            GridCacheVersion mappedVer = res.mappedVersion(i);
+                                if (retval) {
+                                    readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+
+                                    if (readRecordable)
+                                        hasOldVal = entry.hasValue();
+                                }
 
-                            if (newVal == null) {
-                                if (oldValTup != null) {
-                                    if (oldValTup.get1().equals(dhtVer))
-                                        newVal = oldValTup.get2();
+                                GridCacheVersion dhtVer = res.dhtVersion(i);
+                                GridCacheVersion mappedVer = res.mappedVersion(i);
 
-                                    oldVal = oldValTup.get2();
+                                if (newVal == null) {
+                                    if (oldValTup != null) {
+                                        if (oldValTup.get1().equals(dhtVer))
+                                            newVal = oldValTup.get2();
+
+                                        oldVal = oldValTup.get2();
+                                    }
                                 }
-                            }
 
-                            // Lock is held at this point, so we can set the
-                            // returned value if any.
-                            entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
+                                // Lock is held at this point, so we can set the
+                                // returned value if any.
+                                entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
 
-                            if (inTx() && implicitTx() && tx.onePhaseCommit()) {
-                                boolean pass = res.filterResult(i);
+                                if (inTx()) {
+                                    tx.hasRemoteLocks(true);
 
-                                tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
-                            }
+                                    if (implicitTx() && tx.onePhaseCommit()) {
+                                        boolean pass = res.filterResult(i);
 
-                            entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(),
-                                res.pending());
-
-                            if (retval) {
-                                if (readRecordable)
-                                    cctx.events().addEvent(
-                                        entry.partition(),
-                                        entry.key(),
-                                        tx,
-                                        null,
-                                        EVT_CACHE_OBJECT_READ,
-                                        newVal,
-                                        newVal != null,
-                                        oldVal,
-                                        hasOldVal,
-                                        CU.subjectId(tx, cctx.shared()),
-                                        null,
-                                        inTx() ? tx.resolveTaskName() : null);
-
-                                if (cctx.cache().configuration().isStatisticsEnabled())
-                                    cctx.cache().metrics0().onRead(false);
-                            }
+                                        tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
+                                    }
+                                }
 
-                            if (log.isDebugEnabled())
-                                log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+                                entry.readyNearLock(lockVer,
+                                    mappedVer,
+                                    res.committedVersions(),
+                                    res.rolledbackVersions(),
+                                    res.pending());
+
+                                if (retval) {
+                                    if (readRecordable)
+                                        cctx.events().addEvent(
+                                            entry.partition(),
+                                            entry.key(),
+                                            tx,
+                                            null,
+                                            EVT_CACHE_OBJECT_READ,
+                                            newVal,
+                                            newVal != null,
+                                            oldVal,
+                                            hasOldVal,
+                                            CU.subjectId(tx, cctx.shared()),
+                                            null,
+                                            inTx() ? tx.resolveTaskName() : null);
+
+                                    if (cctx.cache().configuration().isStatisticsEnabled())
+                                        cctx.cache().metrics0().onRead(false);
+                                }
 
-                            break; // Inner while loop.
-                        }
-                        catch (GridCacheEntryRemovedException ignored) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to add candidates because entry was removed (will renew).");
+                                if (log.isDebugEnabled())
+                                    log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
 
-                            // Replace old entry with new one.
-                            entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+                                break; // Inner while loop.
+                            }
+                            catch (GridCacheEntryRemovedException ignored) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to add candidates because entry was removed (will renew).");
+
+                                // Replace old entry with new one.
+                                entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+                            }
                         }
+
+                        i++;
                     }
 
-                    i++;
-                }
+                    try {
+                        proceedMapping(mappings);
+                    }
+                    catch (IgniteCheckedException e) {
+                        onDone(e);
+                    }
 
-                try {
-                    proceedMapping(mappings);
-                }
-                catch (IgniteCheckedException e) {
-                    onDone(e);
+                    onDone(true);
                 }
-
-                onDone(true);
             }
         }
 
+        /**
+         *
+         */
+        private void remap() {
+            undoLocks(false, false);
+
+            mapOnTopology(true);
+
+            onDone(true);
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString());


Mime
View raw message