ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [22/28] incubator-ignite git commit: ignite-545: merge from sprint-6
Date Wed, 10 Jun 2015 14:11:43 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4b8db00..9f18c98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -44,6 +44,8 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 import java.util.concurrent.locks.*;
 
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 
 /**
@@ -117,8 +119,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     private GridFutureAdapter<Boolean> initFut;
 
     /** Topology snapshot. */
-    private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot =
-        new AtomicReference<>();
+    private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>();
 
     /** Last committed cache version before next topology version use. */
     private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
@@ -146,8 +147,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     /** Dynamic cache change requests. */
     private Collection<DynamicCacheChangeRequest> reqs;
 
+    /** Cache validation results. */
     private volatile Map<Integer, Boolean> cacheValidRes;
 
+    /** Skip preload flag. */
+    private boolean skipPreload;
+
     /**
      * Dummy future created to trigger reassignments if partition
      * topology changed while preloading.
@@ -200,6 +205,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @param cctx Cache context.
      * @param busyLock Busy lock.
      * @param exchId Exchange ID.
+     * @param reqs Cache change requests.
      */
     public GridDhtPartitionsExchangeFuture(
         GridCacheSharedContext cctx,
@@ -221,16 +227,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         log = cctx.logger(getClass());
 
-        // Grab all nodes with order of equal or less than last joined node.
-        oldestNode.set(CU.oldest(cctx, exchId.topologyVersion()));
-
-        assert oldestNode.get() != null;
-
         initFut = new GridFutureAdapter<>();
 
         if (log.isDebugEnabled())
-            log.debug("Creating exchange future [localNode=" + cctx.localNodeId() +
-                ", fut=" + this + ']');
+            log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']');
+    }
+
+    /**
+     * @param reqs Cache change requests.
+     */
+    public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) {
+        this.reqs = reqs;
     }
 
     /** {@inheritDoc} */
@@ -250,6 +257,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
+     * @return Skip preload flag.
+     */
+    public boolean skipPreload() {
+        return skipPreload;
+    }
+
+    /**
      * @return Dummy flag.
      */
     public boolean dummy() {
@@ -279,9 +293,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
     /**
      * @param cacheId Cache ID to check.
+     * @param topVer Topology version.
      * @return {@code True} if cache was added during this exchange.
      */
-    public boolean isCacheAdded(int cacheId) {
+    public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
         if (!F.isEmpty(reqs)) {
             for (DynamicCacheChangeRequest req : reqs) {
                 if (req.start() && !req.clientStartOnly()) {
@@ -291,7 +306,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             }
         }
 
-        return false;
+        GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+        return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
     }
 
     /**
@@ -312,7 +329,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * Rechecks topology.
+     * @param cacheCtx Cache context.
+     * @throws IgniteCheckedException If failed.
      */
     private void initTopology(GridCacheContext cacheCtx) throws IgniteCheckedException {
         if (stopping(cacheCtx.cacheId()))
@@ -330,8 +348,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     exchId + ']');
 
             // Fetch affinity assignment from remote node.
-            GridDhtAssignmentFetchFuture fetchFut =
-                new GridDhtAssignmentFetchFuture(cacheCtx, exchId.topologyVersion(), CU.affinityNodes(cacheCtx));
+            GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cacheCtx,
+                exchId.topologyVersion(),
+                CU.affinityNodes(cacheCtx, exchId.topologyVersion()));
 
             fetchFut.init();
 
@@ -341,11 +360,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 log.debug("Fetched affinity from remote node, initializing affinity assignment [locNodeId=" +
                     cctx.localNodeId() + ", topVer=" + exchId.topologyVersion() + ']');
 
+            if (affAssignment == null) {
+                affAssignment = new ArrayList<>(cacheCtx.affinity().partitions());
+
+                List<ClusterNode> empty = Collections.emptyList();
+
+                for (int i = 0; i < cacheCtx.affinity().partitions(); i++)
+                    affAssignment.add(empty);
+            }
+
             cacheCtx.affinity().initializeAffinity(exchId.topologyVersion(), affAssignment);
         }
     }
 
     /**
+     * @param cacheCtx Cache context.
      * @return {@code True} if local node can calculate affinity on it's own for this partition map exchange.
      */
     private boolean canCalculateAffinity(GridCacheContext cacheCtx) {
@@ -391,20 +420,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * @return Exchange id.
-     */
-    GridDhtPartitionExchangeId key() {
-        return exchId;
-    }
-
-    /**
-     * @return Oldest node.
-     */
-    ClusterNode oldestNode() {
-        return oldestNode.get();
-    }
-
-    /**
      * @return Exchange ID.
      */
     public GridDhtPartitionExchangeId exchangeId() {
@@ -412,13 +427,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * @return Init future.
-     */
-    IgniteInternalFuture<?> initFuture() {
-        return initFut;
-    }
-
-    /**
      * @return {@code true} if entered to busy state.
      */
     private boolean enterBusy() {
@@ -444,7 +452,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
     public void init() throws IgniteInterruptedCheckedException {
-        assert oldestNode.get() != null;
+        if (isDone())
+            return;
 
         if (init.compareAndSet(false, true)) {
             if (isDone())
@@ -455,10 +464,118 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 // will return corresponding nodes.
                 U.await(evtLatch);
 
+                assert discoEvt != null : this;
+                assert !dummy && !forcePreload : this;
+
+                ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
+
+                oldestNode.set(oldest);
+
                 startCaches();
 
+                // True if client node joined or failed.
+                boolean clientNodeEvt;
+
+                if (F.isEmpty(reqs)) {
+                    int type = discoEvt.type();
+
+                    assert type == EVT_NODE_JOINED || type == EVT_NODE_LEFT || type == EVT_NODE_FAILED : discoEvt;
+
+                    clientNodeEvt = CU.clientNode(discoEvt.eventNode());
+                }
+                else {
+                    assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt;
+
+                    boolean clientOnlyStart = true;
+
+                    for (DynamicCacheChangeRequest req : reqs) {
+                        if (!req.clientStartOnly()) {
+                            clientOnlyStart = false;
+
+                            break;
+                        }
+                    }
+
+                    clientNodeEvt = clientOnlyStart;
+                }
+
+                if (clientNodeEvt) {
+                    ClusterNode node = discoEvt.eventNode();
+
+                    // Client need to initialize affinity for local join event or for stated client caches.
+                    if (!node.isLocal()) {
+                        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                            if (cacheCtx.isLocal())
+                                continue;
+
+                            GridDhtPartitionTopology top = cacheCtx.topology();
+
+                            top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
+
+                            if (cacheCtx.affinity().affinityTopologyVersion() == AffinityTopologyVersion.NONE) {
+                                initTopology(cacheCtx);
+
+                                top.beforeExchange(this);
+                            }
+                            else
+                                cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion());
+                        }
+
+                        if (exchId.isLeft())
+                            cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
+
+                        onDone(exchId.topologyVersion());
+
+                        skipPreload = cctx.kernalContext().clientNode();
+
+                        return;
+                    }
+                }
+
+                if (cctx.kernalContext().clientNode()) {
+                    skipPreload = true;
+
+                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                        if (cacheCtx.isLocal())
+                            continue;
+
+                        GridDhtPartitionTopology top = cacheCtx.topology();
+
+                        top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId()));
+                    }
+
+                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                        if (cacheCtx.isLocal())
+                            continue;
+
+                        initTopology(cacheCtx);
+                    }
+
+                    if (oldestNode.get() != null) {
+                        rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx,
+                            exchId.topologyVersion()));
+
+                        rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
+
+                        ready.set(true);
+
+                        initFut.onDone(true);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Initialized future: " + this);
+
+                        sendPartitions();
+                    }
+                    else
+                        onDone(exchId.topologyVersion());
+
+                    return;
+                }
+
+                assert oldestNode.get() != null;
+
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    if (isCacheAdded(cacheCtx.cacheId())) {
+                    if (isCacheAdded(cacheCtx.cacheId(), exchId.topologyVersion())) {
                         if (cacheCtx.discovery().cacheAffinityNodes(cacheCtx.name(), topologyVersion()).isEmpty())
                             U.quietAndWarn(log, "No server nodes found for cache client: " + cacheCtx.namex());
                     }
@@ -468,8 +585,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                 List<String> cachesWithoutNodes = null;
 
-                for (String name : cctx.cache().cacheNames()) {
-                    if (exchId.isLeft()) {
+                if (exchId.isLeft()) {
+                    for (String name : cctx.cache().cacheNames()) {
                         if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) {
                             if (cachesWithoutNodes == null)
                                 cachesWithoutNodes = new ArrayList<>();
@@ -505,7 +622,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 }
 
                 if (cachesWithoutNodes != null) {
-                    StringBuilder sb = new StringBuilder("All server nodes for the following caches have left the cluster: ");
+                    StringBuilder sb =
+                        new StringBuilder("All server nodes for the following caches have left the cluster: ");
 
                     for (int i = 0; i < cachesWithoutNodes.size(); i++) {
                         String cache = cachesWithoutNodes.get(i);
@@ -537,7 +655,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 }
 
                 // Grab all alive remote nodes with order of equal or less than last joined node.
-                rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteCacheNodes(cctx,
+                rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx,
                     exchId.topologyVersion()));
 
                 rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes)));
@@ -591,6 +709,28 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 if (exchId.isLeft())
                     cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
 
+                IgniteInternalFuture<?> locksFut = cctx.mvcc().finishLocks(exchId.topologyVersion());
+
+                while (true) {
+                    try {
+                        locksFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
+
+                        break;
+                    }
+                    catch (IgniteFutureTimeoutCheckedException ignored) {
+                        U.warn(log, "Failed to wait for locks release future. " +
+                            "Dumping pending objects that might be the cause: " + cctx.localNodeId());
+
+                        U.warn(log, "Locked entries:");
+
+                        Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks =
+                            cctx.mvcc().unfinishedLocks(exchId.topologyVersion());
+
+                        for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet())
+                            U.warn(log, "Locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']');
+                    }
+                }
+
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                     if (cacheCtx.isLocal())
                         continue;
@@ -650,36 +790,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             if (log.isDebugEnabled())
                 log.debug("Initialized future: " + this);
 
-            if (canSkipExchange())
-                onDone(exchId.topologyVersion());
+            // If this node is not oldest.
+            if (!oldestNode.get().id().equals(cctx.localNodeId()))
+                sendPartitions();
             else {
-                // If this node is not oldest.
-                if (!oldestNode.get().id().equals(cctx.localNodeId()))
-                    sendPartitions();
-                else {
-                    boolean allReceived = allReceived();
+                boolean allReceived = allReceived();
 
-                    if (allReceived && replied.compareAndSet(false, true)) {
-                        if (spreadPartitions())
-                            onDone(exchId.topologyVersion());
-                    }
+                if (allReceived && replied.compareAndSet(false, true)) {
+                    if (spreadPartitions())
+                        onDone(exchId.topologyVersion());
                 }
-
-                scheduleRecheck();
             }
+
+            scheduleRecheck();
         }
         else
             assert false : "Skipped init future: " + this;
     }
 
     /**
-     * @return {@code True} if no distributed exchange is needed.
-     */
-    private boolean canSkipExchange() {
-        return false; // TODO ignite-23;
-    }
-
-    /**
      *
      */
     private void dumpPendingObjects() {
@@ -755,7 +884,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @throws IgniteCheckedException If failed.
      */
     private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException {
-        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last());
+        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
+            cctx.kernalContext().clientNode(),
+            cctx.versions().last());
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal())
@@ -780,8 +911,14 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             id.topologyVersion());
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal())
-                m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+            if (!cacheCtx.isLocal()) {
+                AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion();
+
+                boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0;
+
+                if (ready)
+                    m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+            }
         }
 
         // It is important that client topologies be added after contexts.
@@ -839,14 +976,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
     /** {@inheritDoc} */
     @Override public boolean onDone(AffinityTopologyVersion res, Throwable err) {
-        Map<Integer, Boolean> m = new HashMap<>();
+        Map<Integer, Boolean> m = null;
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name()))
+            if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name())) {
+                if (m == null)
+                    m = new HashMap<>();
+
                 m.put(cacheCtx.cacheId(), cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes()));
+            }
         }
 
-        cacheValidRes = m;
+        cacheValidRes = m != null ? m : Collections.<Integer, Boolean>emptyMap();
 
         cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err);
 
@@ -864,8 +1005,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             if (timeoutObj != null)
                 cctx.kernalContext().timeout().removeTimeoutObject(timeoutObj);
 
-            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                if (exchId.event() == EventType.EVT_NODE_FAILED || exchId.event() == EventType.EVT_NODE_LEFT)
+            if (exchId.isLeft()) {
+                for (GridCacheContext cacheCtx : cctx.cacheContexts())
                     cacheCtx.config().getAffinity().removeNode(exchId.nodeId());
             }
 
@@ -1018,39 +1159,39 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             return;
         }
 
-        ClusterNode curOldest = oldestNode.get();
+        if (log.isDebugEnabled())
+            log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']');
 
-        if (!nodeId.equals(curOldest.id())) {
-            if (log.isDebugEnabled())
-                log.debug("Received full partition map from unexpected node [oldest=" + curOldest.id() +
-                    ", unexpectedNodeId=" + nodeId + ']');
+        assert exchId.topologyVersion().equals(msg.topologyVersion());
 
-            ClusterNode sender = cctx.discovery().node(nodeId);
+        initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+            @Override public void apply(IgniteInternalFuture<Boolean> t) {
+                ClusterNode curOldest = oldestNode.get();
 
-            if (sender == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Sender node left grid, will ignore message from unexpected node [nodeId=" + nodeId +
-                        ", exchId=" + msg.exchangeId() + ']');
+                if (!nodeId.equals(curOldest.id())) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received full partition map from unexpected node [oldest=" + curOldest.id() +
+                            ", unexpectedNodeId=" + nodeId + ']');
 
-                return;
-            }
+                    ClusterNode snd = cctx.discovery().node(nodeId);
 
-            // Will process message later if sender node becomes oldest node.
-            if (sender.order() > curOldest.order())
-                fullMsgs.put(nodeId, msg);
+                    if (snd == null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Sender node left grid, will ignore message from unexpected node [nodeId=" + nodeId +
+                                ", exchId=" + msg.exchangeId() + ']');
 
-            return;
-        }
+                        return;
+                    }
 
-        assert msg.exchangeId().equals(exchId);
+                    // Will process message later if sender node becomes oldest node.
+                    if (snd.order() > curOldest.order())
+                        fullMsgs.put(nodeId, msg);
 
-        if (log.isDebugEnabled())
-            log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']');
+                    return;
+                }
 
-        assert exchId.topologyVersion().equals(msg.topologyVersion());
+                assert msg.exchangeId().equals(exchId);
 
-        initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
-            @Override public void apply(IgniteInternalFuture<Boolean> t) {
                 assert msg.lastVersion() != null;
 
                 cctx.versions().onReceived(nodeId, msg.lastVersion());
@@ -1075,8 +1216,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
             if (cacheCtx != null)
                 cacheCtx.topology().update(exchId, entry.getValue());
-            else if (CU.oldest(cctx).isLocal())
-                cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue());
+            else {
+                ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+
+                if (oldest != null && oldest.isLocal())
+                    cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue());
+            }
         }
     }
 
@@ -1135,40 +1280,42 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                             boolean set = false;
 
-                            ClusterNode newOldest = CU.oldest(cctx, exchId.topologyVersion());
-
-                            // If local node is now oldest.
-                            if (newOldest.id().equals(cctx.localNodeId())) {
-                                synchronized (mux) {
-                                    if (oldestNode.compareAndSet(oldest, newOldest)) {
-                                        // If local node is just joining.
-                                        if (exchId.nodeId().equals(cctx.localNodeId())) {
-                                            try {
-                                                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                                                    if (!cacheCtx.isLocal())
-                                                        cacheCtx.topology().beforeExchange(
-                                                            GridDhtPartitionsExchangeFuture.this);
+                            ClusterNode newOldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion());
+
+                            if (newOldest != null) {
+                                // If local node is now oldest.
+                                if (newOldest.id().equals(cctx.localNodeId())) {
+                                    synchronized (mux) {
+                                        if (oldestNode.compareAndSet(oldest, newOldest)) {
+                                            // If local node is just joining.
+                                            if (exchId.nodeId().equals(cctx.localNodeId())) {
+                                                try {
+                                                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                                                        if (!cacheCtx.isLocal())
+                                                            cacheCtx.topology().beforeExchange(
+                                                                GridDhtPartitionsExchangeFuture.this);
+                                                    }
                                                 }
-                                            }
-                                            catch (IgniteCheckedException e) {
-                                                onDone(e);
+                                                catch (IgniteCheckedException e) {
+                                                    onDone(e);
 
-                                                return;
+                                                    return;
+                                                }
                                             }
-                                        }
 
-                                        set = true;
+                                            set = true;
+                                        }
                                     }
                                 }
-                            }
-                            else {
-                                synchronized (mux) {
-                                    set = oldestNode.compareAndSet(oldest, newOldest);
-                                }
+                                else {
+                                    synchronized (mux) {
+                                        set = oldestNode.compareAndSet(oldest, newOldest);
+                                    }
 
-                                if (set && log.isDebugEnabled())
-                                    log.debug("Reassigned oldest node [this=" + cctx.localNodeId() +
-                                        ", old=" + oldest.id() + ", new=" + newOldest.id() + ']');
+                                    if (set && log.isDebugEnabled())
+                                        log.debug("Reassigned oldest node [this=" + cctx.localNodeId() +
+                                            ", old=" + oldest.id() + ", new=" + newOldest.id() + ']');
+                                }
                             }
 
                             if (set) {
@@ -1190,9 +1337,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                             assert rmtNodes != null;
 
-                            for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); )
+                            for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) {
                                 if (it.next().id().equals(nodeId))
                                     it.remove();
+                            }
 
                             if (allReceived() && ready.get() && replied.compareAndSet(false, true))
                                 if (spreadPartitions())
@@ -1254,30 +1402,34 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             GridTimeoutObject timeoutObj = new GridTimeoutObjectAdapter(
                 cctx.gridConfig().getNetworkTimeout() * Math.max(1, cctx.gridConfig().getCacheConfiguration().length)) {
                 @Override public void onTimeout() {
-                    if (isDone())
-                        return;
-
-                    if (!enterBusy())
-                        return;
+                    cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                        @Override public void run() {
+                            if (isDone())
+                                return;
+
+                            if (!enterBusy())
+                                return;
+
+                            try {
+                                U.warn(log,
+                                    "Retrying preload partition exchange due to timeout [done=" + isDone() +
+                                        ", dummy=" + dummy + ", exchId=" + exchId + ", rcvdIds=" + F.id8s(rcvdIds) +
+                                        ", rmtIds=" + F.id8s(rmtIds) + ", remaining=" + F.id8s(remaining()) +
+                                        ", init=" + init + ", initFut=" + initFut.isDone() +
+                                        ", ready=" + ready + ", replied=" + replied + ", added=" + added +
+                                        ", oldest=" + U.id8(oldestNode.get().id()) + ", oldestOrder=" +
+                                        oldestNode.get().order() + ", evtLatch=" + evtLatch.getCount() +
+                                        ", locNodeOrder=" + cctx.localNode().order() +
+                                        ", locNodeId=" + cctx.localNode().id() + ']',
+                                    "Retrying preload partition exchange due to timeout.");
 
-                    try {
-                        U.warn(log,
-                            "Retrying preload partition exchange due to timeout [done=" + isDone() +
-                                ", dummy=" + dummy + ", exchId=" + exchId + ", rcvdIds=" + F.id8s(rcvdIds) +
-                                ", rmtIds=" + F.id8s(rmtIds) + ", remaining=" + F.id8s(remaining()) +
-                                ", init=" + init + ", initFut=" + initFut.isDone() +
-                                ", ready=" + ready + ", replied=" + replied + ", added=" + added +
-                                ", oldest=" + U.id8(oldestNode.get().id()) + ", oldestOrder=" +
-                                oldestNode.get().order() + ", evtLatch=" + evtLatch.getCount() +
-                                ", locNodeOrder=" + cctx.localNode().order() +
-                                ", locNodeId=" + cctx.localNode().id() + ']',
-                            "Retrying preload partition exchange due to timeout.");
-
-                        recheck();
-                    }
-                    finally {
-                        leaveBusy();
-                    }
+                                recheck();
+                            }
+                            finally {
+                                leaveBusy();
+                            }
+                        }
+                    });
                 }
             };
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 8256274..73794ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -59,8 +59,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /**
      * @param id Exchange ID.
      * @param lastVer Last version.
+     * @param topVer Topology version.
      */
-    public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id, @Nullable GridCacheVersion lastVer,
+    public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id,
+        @Nullable GridCacheVersion lastVer,
         @NotNull AffinityTopologyVersion topVer) {
         super(id, lastVer);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 66140cd..713a80b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -45,6 +45,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /** Serialized partitions. */
     private byte[] partsBytes;
 
+    /** */
+    private boolean client;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -54,10 +57,22 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /**
      * @param exchId Exchange ID.
+     * @param client Client message flag.
      * @param lastVer Last version.
      */
-    public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId, @Nullable GridCacheVersion lastVer) {
+    public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
+        boolean client,
+        @Nullable GridCacheVersion lastVer) {
         super(exchId, lastVer);
+
+        this.client = client;
+    }
+
+    /**
+     * @return {@code True} if sent from client node.
+     */
+    public boolean client() {
+        return client;
     }
 
     /**
@@ -110,6 +125,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
         switch (writer.state()) {
             case 5:
+                if (!writer.writeBoolean("client", client))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -132,6 +153,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
         switch (reader.state()) {
             case 5:
+                client = reader.readBoolean("client");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -151,7 +180,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 7;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index d6373f0..51010ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
@@ -46,7 +47,7 @@ import static org.apache.ignite.internal.util.GridConcurrentFactory.*;
 /**
  * DHT cache preloader.
  */
-public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
+public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** Default preload resend timeout. */
     public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500;
 
@@ -57,13 +58,13 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
     private final GridAtomicLong topVer = new GridAtomicLong();
 
     /** Force key futures. */
-    private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<K, V>> forceKeyFuts = newMap();
+    private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap();
 
     /** Partition suppliers. */
-    private GridDhtPartitionSupplyPool<K, V> supplyPool;
+    private GridDhtPartitionSupplyPool supplyPool;
 
     /** Partition demanders. */
-    private GridDhtPartitionDemandPool<K, V> demandPool;
+    private GridDhtPartitionDemandPool demandPool;
 
     /** Start future. */
     private final GridFutureAdapter<Object> startFut;
@@ -92,7 +93,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
 
                 assert !loc.id().equals(n.id());
 
-                for (GridDhtForceKeysFuture<K, V> f : forceKeyFuts.values())
+                for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values())
                     f.onDiscoveryEvent(e);
 
                 assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " +
@@ -117,7 +118,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
     /**
      * @param cctx Cache context.
      */
-    public GridDhtPreloader(GridCacheContext<K, V> cctx) {
+    public GridDhtPreloader(GridCacheContext<?, ?> cctx) {
         super(cctx);
 
         top = cctx.dht().topology();
@@ -158,8 +159,8 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
                 }
             });
 
-        supplyPool = new GridDhtPartitionSupplyPool<>(cctx, busyLock);
-        demandPool = new GridDhtPartitionDemandPool<>(cctx, busyLock);
+        supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock);
+        demandPool = new GridDhtPartitionDemandPool(cctx, busyLock);
 
         cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
@@ -227,12 +228,14 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
 
             final long start = U.currentTimeMillis();
 
-            if (cctx.config().getRebalanceDelay() >= 0) {
-                U.log(log, "Starting rebalancing in " + cctx.config().getRebalanceMode() + " mode: " + cctx.name());
+            final CacheConfiguration cfg = cctx.config();
+
+            if (cfg.getRebalanceDelay() >= 0) {
+                U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name());
 
                 demandPool.syncFuture().listen(new CI1<Object>() {
                     @Override public void apply(Object t) {
-                        U.log(log, "Completed rebalancing in " + cctx.config().getRebalanceMode() + " mode " +
+                        U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " +
                             "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]");
                     }
                 });
@@ -253,12 +256,12 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) {
+    @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
         return demandPool.assign(exchFut);
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload) {
+    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
         demandPool.addAssignments(assignments, forcePreload);
     }
 
@@ -271,7 +274,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> syncFuture() {
-        return demandPool.syncFuture();
+        return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture();
     }
 
     /**
@@ -406,7 +409,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
             return;
 
         try {
-            GridDhtForceKeysFuture<K, V> f = forceKeyFuts.get(msg.futureId());
+            GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId());
 
             if (f != null)
                 f.onResult(node.id(), msg);
@@ -491,7 +494,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
      */
     @SuppressWarnings( {"unchecked", "RedundantCast"})
     @Override public GridDhtFuture<Object> request(Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer) {
-        final GridDhtForceKeysFuture<K, V> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
+        final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this);
 
         IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer);
 
@@ -543,7 +546,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
      *
      * @param fut Future to add.
      */
-    void addFuture(GridDhtForceKeysFuture<K, V> fut) {
+    void addFuture(GridDhtForceKeysFuture<?, ?> fut) {
         forceKeyFuts.put(fut.futureId(), fut);
     }
 
@@ -552,7 +555,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
      *
      * @param fut Future to remove.
      */
-    void remoteFuture(GridDhtForceKeysFuture<K, V> fut) {
+    void remoteFuture(GridDhtForceKeysFuture<?, ?> fut) {
         forceKeyFuts.remove(fut.futureId(), fut);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 369fc68..2f6ef6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -27,8 +27,7 @@ import java.util.concurrent.*;
 /**
  * Partition to node assignments.
  */
-public class GridDhtPreloaderAssignments<K, V> extends
-    ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> {
+public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode, GridDhtPartitionDemandMessage> {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index ba3357d..041f83a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -433,6 +433,11 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException {
+        return dht.tryPutIfAbsent(key, val);
+    }
+
+    /** {@inheritDoc} */
     @Override public V getAndReplace(K key, V val) throws IgniteCheckedException {
         return dht.getAndReplace(key, val);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 8258b14..351d6cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -95,7 +95,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
     }
 
     /** {@inheritDoc} */
-    @Override public GridCachePreloader<K, V> preloader() {
+    @Override public GridCachePreloader preloader() {
         return dht().preloader();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index fc178e3..74438bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -274,7 +274,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         if (affNodes.isEmpty()) {
             assert !cctx.affinityNode();
 
-            onDone(new ClusterTopologyCheckedException("Failed to map keys for near-only cache (all partition " +
+            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for near-only cache (all partition " +
                 "nodes left the grid)."));
 
             return;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 0ffb4e5..3d28018 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -45,7 +45,7 @@ import static org.apache.ignite.events.EventType.*;
 /**
  * Cache lock future.
  */
-public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean>
     implements GridCacheMvccFuture<Boolean> {
     /** */
     private static final long serialVersionUID = 0L;
@@ -58,7 +58,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
 
     /** Cache registry. */
     @GridToStringExclude
-    private GridCacheContext<K, V> cctx;
+    private GridCacheContext<?, ?> cctx;
 
     /** Lock owner thread. */
     @GridToStringInclude
@@ -135,7 +135,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
      * @param skipStore skipStore
      */
     public GridNearLockFuture(
-        GridCacheContext<K, V> cctx,
+        GridCacheContext<?, ?> cctx,
         Collection<KeyCacheObject> keys,
         @Nullable GridNearTxLocal tx,
         boolean read,
@@ -184,15 +184,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
      * @return Participating nodes.
      */
     @Override public Collection<? extends ClusterNode> nodes() {
-        return
-            F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
-                @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
-                    if (isMini(f))
-                        return ((MiniFuture)f).node();
+        return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
+            @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
+                if (isMini(f))
+                    return ((MiniFuture)f).node();
 
-                    return cctx.discovery().localNode();
-                }
-            });
+                return cctx.discovery().localNode();
+            }
+        });
     }
 
     /** {@inheritDoc} */
@@ -350,13 +349,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
      * Undoes all locks.
      *
      * @param dist If {@code true}, then remove locks from remote nodes as well.
+     * @param rollback {@code True} if should rollback tx.
      */
-    private void undoLocks(boolean dist) {
+    private void undoLocks(boolean dist, boolean rollback) {
         // Transactions will undo during rollback.
         if (dist && tx == null)
             cctx.nearTx().removeLocks(lockVer, keys);
         else {
-            if (tx != null) {
+            if (rollback && tx != null) {
                 if (tx.setRollbackOnly()) {
                     if (log.isDebugEnabled())
                         log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx);
@@ -397,7 +397,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
      * @param dist {@code True} if need to distribute lock release.
      */
     private void onFailed(boolean dist) {
-        undoLocks(dist);
+        undoLocks(dist, true);
 
         complete(false);
     }
@@ -607,7 +607,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
                 ", fut=" + this + ']');
 
         if (!success)
-            undoLocks(distribute);
+            undoLocks(distribute, true);
 
         if (tx != null)
             cctx.tm().txContext(tx);
@@ -682,7 +682,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
             // Continue mapping on the same topology version as it was before.
             this.topVer.compareAndSet(null, topVer);
 
-            map(keys);
+            map(keys, false);
 
             markInitialized();
 
@@ -690,14 +690,16 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
         }
 
         // Must get topology snapshot and map on that version.
-        mapOnTopology();
+        mapOnTopology(false);
     }
 
     /**
      * Acquires topology future and checks it completeness under the read lock. If it is not complete,
      * will asynchronously wait for it's completeness and then try again.
+     *
+     * @param remap Remap flag.
      */
-    void mapOnTopology() {
+    void mapOnTopology(final boolean remap) {
         // We must acquire topology snapshot from the topology version future.
         cctx.topology().readLock();
 
@@ -721,19 +723,27 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
 
                 AffinityTopologyVersion topVer = fut.topologyVersion();
 
-                if (tx != null)
-                    tx.topologyVersion(topVer);
+                if (remap) {
+                    if (tx != null)
+                        tx.onRemap(topVer);
 
-                this.topVer.compareAndSet(null, topVer);
+                    this.topVer.set(topVer);
+                }
+                else {
+                    if (tx != null)
+                        tx.topologyVersion(topVer);
+
+                    this.topVer.compareAndSet(null, topVer);
+                }
 
-                map(keys);
+                map(keys, remap);
 
                 markInitialized();
             }
             else {
                 fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                     @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                        mapOnTopology();
+                        mapOnTopology(remap);
                     }
                 });
             }
@@ -749,14 +759,15 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
      * groups belonging to one primary node and locks for these groups are acquired sequentially.
      *
      * @param keys Keys.
+     * @param remap Remap flag.
      */
-    private void map(Iterable<KeyCacheObject> keys) {
+    private void map(Iterable<KeyCacheObject> keys, boolean remap) {
         try {
             AffinityTopologyVersion topVer = this.topVer.get();
 
             assert topVer != null;
 
-            assert topVer.topologyVersion() > 0;
+            assert topVer.topologyVersion() > 0 : topVer;
 
             if (CU.affinityNodes(cctx, topVer).isEmpty()) {
                 onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for near-only cache (all " +
@@ -765,8 +776,11 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
                 return;
             }
 
-            ConcurrentLinkedDeque8<GridNearLockMapping> mappings =
-                new ConcurrentLinkedDeque8<>();
+            boolean clientNode = cctx.kernalContext().clientNode();
+
+            assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
+
+            ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
 
             // Assign keys to primary nodes.
             GridNearLockMapping map = null;
@@ -795,6 +809,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
             if (log.isDebugEnabled())
                 log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
 
+            boolean first = true;
+
             // Create mini futures.
             for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
                 GridNearLockMapping mapping = iter.next();
@@ -872,6 +888,14 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
 
                                 if (!cand.reentry()) {
                                     if (req == null) {
+                                        boolean clientFirst = false;
+
+                                        if (first) {
+                                            clientFirst = clientNode && (tx == null || !tx.hasRemoteLocks());
+
+                                            first = false;
+                                        }
+
                                         req = new GridNearLockRequest(
                                             cctx.cacheId(),
                                             topVer,
@@ -893,7 +917,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
                                             inTx() ? tx.subjectId() : null,
                                             inTx() ? tx.taskNameHash() : 0,
                                             read ? accessTtl : -1L,
-                                            skipStore);
+                                            skipStore,
+                                            clientFirst);
 
                                         mapping.request(req);
                                     }
@@ -1197,7 +1222,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
     /**
      * @return DHT cache.
      */
-    private GridDhtTransactionalCacheAdapter<K, V> dht() {
+    private GridDhtTransactionalCacheAdapter<?, ?> dht() {
         return cctx.nearTx().dht();
     }
 
@@ -1356,110 +1381,146 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
                     return;
                 }
 
-                int i = 0;
+                if (res.clientRemapVersion() != null) {
+                    assert cctx.kernalContext().clientNode();
 
-                AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
+                    IgniteInternalFuture<?> affFut =
+                        cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
 
-                for (KeyCacheObject k : keys) {
-                    while (true) {
-                        GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
+                    if (affFut != null && !affFut.isDone()) {
+                        affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                            @Override public void apply(IgniteInternalFuture<?> fut) {
+                                remap();
+                            }
+                        });
+                    }
+                    else
+                        remap();
+                }
+                else {
+                    int i = 0;
 
-                        try {
-                            if (res.dhtVersion(i) == null) {
-                                onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
-                                    "(will fail the lock): " + res));
+                    AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
 
-                                return;
-                            }
+                    for (KeyCacheObject k : keys) {
+                        while (true) {
+                            GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
 
-                            IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
+                            try {
+                                if (res.dhtVersion(i) == null) {
+                                    onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+                                        "(will fail the lock): " + res));
 
-                            CacheObject oldVal = entry.rawGet();
-                            boolean hasOldVal = false;
-                            CacheObject newVal = res.value(i);
+                                    return;
+                                }
 
-                            boolean readRecordable = false;
+                                IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
 
-                            if (retval) {
-                                readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+                                CacheObject oldVal = entry.rawGet();
+                                boolean hasOldVal = false;
+                                CacheObject newVal = res.value(i);
 
-                                if (readRecordable)
-                                    hasOldVal = entry.hasValue();
-                            }
+                                boolean readRecordable = false;
 
-                            GridCacheVersion dhtVer = res.dhtVersion(i);
-                            GridCacheVersion mappedVer = res.mappedVersion(i);
+                                if (retval) {
+                                    readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
+
+                                    if (readRecordable)
+                                        hasOldVal = entry.hasValue();
+                                }
 
-                            if (newVal == null) {
-                                if (oldValTup != null) {
-                                    if (oldValTup.get1().equals(dhtVer))
-                                        newVal = oldValTup.get2();
+                                GridCacheVersion dhtVer = res.dhtVersion(i);
+                                GridCacheVersion mappedVer = res.mappedVersion(i);
 
-                                    oldVal = oldValTup.get2();
+                                if (newVal == null) {
+                                    if (oldValTup != null) {
+                                        if (oldValTup.get1().equals(dhtVer))
+                                            newVal = oldValTup.get2();
+
+                                        oldVal = oldValTup.get2();
+                                    }
                                 }
-                            }
 
-                            // Lock is held at this point, so we can set the
-                            // returned value if any.
-                            entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
+                                // Lock is held at this point, so we can set the
+                                // returned value if any.
+                                entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
 
-                            if (inTx() && implicitTx() && tx.onePhaseCommit()) {
-                                boolean pass = res.filterResult(i);
+                                if (inTx()) {
+                                    tx.hasRemoteLocks(true);
 
-                                tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
-                            }
+                                    if (implicitTx() && tx.onePhaseCommit()) {
+                                        boolean pass = res.filterResult(i);
 
-                            entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(),
-                                res.pending());
-
-                            if (retval) {
-                                if (readRecordable)
-                                    cctx.events().addEvent(
-                                        entry.partition(),
-                                        entry.key(),
-                                        tx,
-                                        null,
-                                        EVT_CACHE_OBJECT_READ,
-                                        newVal,
-                                        newVal != null,
-                                        oldVal,
-                                        hasOldVal,
-                                        CU.subjectId(tx, cctx.shared()),
-                                        null,
-                                        inTx() ? tx.resolveTaskName() : null);
-
-                                if (cctx.cache().configuration().isStatisticsEnabled())
-                                    cctx.cache().metrics0().onRead(false);
-                            }
+                                        tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
+                                    }
+                                }
 
-                            if (log.isDebugEnabled())
-                                log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
+                                entry.readyNearLock(lockVer,
+                                    mappedVer,
+                                    res.committedVersions(),
+                                    res.rolledbackVersions(),
+                                    res.pending());
+
+                                if (retval) {
+                                    if (readRecordable)
+                                        cctx.events().addEvent(
+                                            entry.partition(),
+                                            entry.key(),
+                                            tx,
+                                            null,
+                                            EVT_CACHE_OBJECT_READ,
+                                            newVal,
+                                            newVal != null,
+                                            oldVal,
+                                            hasOldVal,
+                                            CU.subjectId(tx, cctx.shared()),
+                                            null,
+                                            inTx() ? tx.resolveTaskName() : null);
+
+                                    if (cctx.cache().configuration().isStatisticsEnabled())
+                                        cctx.cache().metrics0().onRead(false);
+                                }
 
-                            break; // Inner while loop.
-                        }
-                        catch (GridCacheEntryRemovedException ignored) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to add candidates because entry was removed (will renew).");
+                                if (log.isDebugEnabled())
+                                    log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
 
-                            // Replace old entry with new one.
-                            entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+                                break; // Inner while loop.
+                            }
+                            catch (GridCacheEntryRemovedException ignored) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to add candidates because entry was removed (will renew).");
+
+                                // Replace old entry with new one.
+                                entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
+                            }
                         }
+
+                        i++;
                     }
 
-                    i++;
-                }
+                    try {
+                        proceedMapping(mappings);
+                    }
+                    catch (IgniteCheckedException e) {
+                        onDone(e);
+                    }
 
-                try {
-                    proceedMapping(mappings);
-                }
-                catch (IgniteCheckedException e) {
-                    onDone(e);
+                    onDone(true);
                 }
-
-                onDone(true);
             }
         }
 
+        /**
+         *
+         */
+        private void remap() {
+            undoLocks(false, false);
+
+            mapOnTopology(true);
+
+            onDone(true);
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index e71dd65..81184a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -80,6 +80,9 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
     /** Flag indicating whether cache operation requires a previous value. */
     private boolean retVal;
 
+    /** {@code True} if first lock request for lock operation sent from client node. */
+    private boolean firstClientReq;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -98,6 +101,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
      * @param implicitTx Flag to indicate that transaction is implicit.
      * @param implicitSingleTx Implicit-transaction-with-one-key flag.
      * @param isRead Indicates whether implicit lock is for read or write operation.
+     * @param retVal Return value flag.
      * @param isolation Transaction isolation.
      * @param isInvalidate Invalidation flag.
      * @param timeout Lock timeout.
@@ -108,6 +112,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
      * @param taskNameHash Task name hash code.
      * @param accessTtl TTL for read operation.
      * @param skipStore Skip store flag.
+     * @param firstClientReq {@code True} if first lock request for lock operation sent from client node.
      */
     public GridNearLockRequest(
         int cacheId,
@@ -130,7 +135,8 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
         @Nullable UUID subjId,
         int taskNameHash,
         long accessTtl,
-        boolean skipStore
+        boolean skipStore,
+        boolean firstClientReq
     ) {
         super(
             cacheId,
@@ -158,11 +164,19 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
         this.taskNameHash = taskNameHash;
         this.accessTtl = accessTtl;
         this.retVal = retVal;
+        this.firstClientReq = firstClientReq;
 
         dhtVers = new GridCacheVersion[keyCnt];
     }
 
     /**
+     * @return {@code True} if first lock request for lock operation sent from client node.
+     */
+    public boolean firstClientRequest() {
+        return firstClientReq;
+    }
+
+    /**
      * @return Topology version.
      */
     @Override public AffinityTopologyVersion topologyVersion() {
@@ -368,60 +382,66 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeBoolean("hasTransforms", hasTransforms))
+                if (!writer.writeBoolean("firstClientReq", firstClientReq))
                     return false;
 
                 writer.incrementState();
 
             case 25:
-                if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx))
+                if (!writer.writeBoolean("hasTransforms", hasTransforms))
                     return false;
 
                 writer.incrementState();
 
             case 26:
-                if (!writer.writeBoolean("implicitTx", implicitTx))
+                if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx))
                     return false;
 
                 writer.incrementState();
 
             case 27:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeBoolean("implicitTx", implicitTx))
                     return false;
 
                 writer.incrementState();
 
             case 28:
-                if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
+                if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
             case 29:
-                if (!writer.writeBoolean("retVal", retVal))
+                if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
                     return false;
 
                 writer.incrementState();
 
             case 30:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBoolean("retVal", retVal))
                     return false;
 
                 writer.incrementState();
 
             case 31:
-                if (!writer.writeBoolean("syncCommit", syncCommit))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 32:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeBoolean("syncCommit", syncCommit))
                     return false;
 
                 writer.incrementState();
 
             case 33:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 34:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -468,7 +488,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 24:
-                hasTransforms = reader.readBoolean("hasTransforms");
+                firstClientReq = reader.readBoolean("firstClientReq");
 
                 if (!reader.isLastRead())
                     return false;
@@ -476,7 +496,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 25:
-                implicitSingleTx = reader.readBoolean("implicitSingleTx");
+                hasTransforms = reader.readBoolean("hasTransforms");
 
                 if (!reader.isLastRead())
                     return false;
@@ -484,7 +504,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 26:
-                implicitTx = reader.readBoolean("implicitTx");
+                implicitSingleTx = reader.readBoolean("implicitSingleTx");
 
                 if (!reader.isLastRead())
                     return false;
@@ -492,7 +512,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 27:
-                miniId = reader.readIgniteUuid("miniId");
+                implicitTx = reader.readBoolean("implicitTx");
 
                 if (!reader.isLastRead())
                     return false;
@@ -500,7 +520,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 28:
-                onePhaseCommit = reader.readBoolean("onePhaseCommit");
+                miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -508,7 +528,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 29:
-                retVal = reader.readBoolean("retVal");
+                onePhaseCommit = reader.readBoolean("onePhaseCommit");
 
                 if (!reader.isLastRead())
                     return false;
@@ -516,7 +536,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 30:
-                subjId = reader.readUuid("subjId");
+                retVal = reader.readBoolean("retVal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -524,7 +544,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 31:
-                syncCommit = reader.readBoolean("syncCommit");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -532,7 +552,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 32:
-                taskNameHash = reader.readInt("taskNameHash");
+                syncCommit = reader.readBoolean("syncCommit");
 
                 if (!reader.isLastRead())
                     return false;
@@ -540,6 +560,14 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
                 reader.incrementState();
 
             case 33:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 34:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -559,7 +587,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 34;
+        return 35;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
index 20928de..f324198 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -58,6 +59,9 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
     /** Filter evaluation results for fast-commit transactions. */
     private boolean[] filterRes;
 
+    /** {@code True} if client node should remap lock request. */
+    private AffinityTopologyVersion clientRemapVer;
+
     /**
      * Empty constructor (required by {@link Externalizable}).
      */
@@ -73,6 +77,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
      * @param filterRes {@code True} if need to allocate array for filter evaluation results.
      * @param cnt Count.
      * @param err Error.
+     * @param clientRemapVer {@code True} if client node should remap lock request.
      */
     public GridNearLockResponse(
         int cacheId,
@@ -81,13 +86,15 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
         IgniteUuid miniId,
         boolean filterRes,
         int cnt,
-        Throwable err
+        Throwable err,
+        AffinityTopologyVersion clientRemapVer
     ) {
         super(cacheId, lockVer, futId, cnt, err);
 
         assert miniId != null;
 
         this.miniId = miniId;
+        this.clientRemapVer = clientRemapVer;
 
         dhtVers = new GridCacheVersion[cnt];
         mappedVers = new GridCacheVersion[cnt];
@@ -97,6 +104,13 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
     }
 
     /**
+     * @return {@code True} if client node should remap lock request.
+     */
+    @Nullable public AffinityTopologyVersion clientRemapVersion() {
+        return clientRemapVer;
+    }
+
+    /**
      * Gets pending versions that are less than {@link #version()}.
      *
      * @return Pending versions.
@@ -192,30 +206,36 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
 
         switch (writer.state()) {
             case 11:
-                if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("clientRemapVer", clientRemapVer))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeBooleanArray("filterRes", filterRes))
+                if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeObjectArray("mappedVers", mappedVers, MessageCollectionItemType.MSG))
+                if (!writer.writeBooleanArray("filterRes", filterRes))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeObjectArray("mappedVers", mappedVers, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 15:
+                if (!writer.writeIgniteUuid("miniId", miniId))
+                    return false;
+
+                writer.incrementState();
+
+            case 16:
                 if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
                     return false;
 
@@ -238,7 +258,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
 
         switch (reader.state()) {
             case 11:
-                dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class);
+                clientRemapVer = reader.readMessage("clientRemapVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -246,7 +266,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
                 reader.incrementState();
 
             case 12:
-                filterRes = reader.readBooleanArray("filterRes");
+                dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -254,7 +274,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
                 reader.incrementState();
 
             case 13:
-                mappedVers = reader.readObjectArray("mappedVers", MessageCollectionItemType.MSG, GridCacheVersion.class);
+                filterRes = reader.readBooleanArray("filterRes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -262,7 +282,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
                 reader.incrementState();
 
             case 14:
-                miniId = reader.readIgniteUuid("miniId");
+                mappedVers = reader.readObjectArray("mappedVers", MessageCollectionItemType.MSG, GridCacheVersion.class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -270,6 +290,14 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
                 reader.incrementState();
 
             case 15:
+                miniId = reader.readIgniteUuid("miniId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 16:
                 pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -289,7 +317,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 16;
+        return 17;
     }
 
     /** {@inheritDoc} */


Mime
View raw message